[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338392894 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + + /** +* Adds column name to the registered column set. This will add nested column names recursive. +* Nested column names are qualified using "." separator. +*/ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** +* Returns true if the column name is existed in the registered column set. +* This supports qualified column name using "." separator. +*/ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: Then why shall we allow nested field for structure type ? I didn't see the difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338391937 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + + /** +* Adds column name to the registered column set. This will add nested column names recursive. +* Nested column names are qualified using "." separator. +*/ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** +* Returns true if the column name is existed in the registered column set. +* This supports qualified column name using "." separator. +*/ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: `array[1]` or `map['key1']` are expressions, not field reference. The watermark statement design aims to have an **existing column** as the rowtime field. If it doesn't exist in the schema, we can use computed column to derive it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338390788 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + + /** +* Adds column name to the registered column set. This will add nested column names recursive. +* Nested column names are qualified using "." separator. +*/ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** +* Returns true if the column name is existed in the registered column set. +* This supports qualified column name using "." separator. +*/ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: This is not allowed by the design of FLIP-66. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #9803: [FLINK-14265][table-planner-blink] Don't use ContinuousFileReaderOperator to support multiple paths
JingsongLi commented on issue #9803: [FLINK-14265][table-planner-blink] Don't use ContinuousFileReaderOperator to support multiple paths URL: https://github.com/apache/flink/pull/9803#issuecomment-545755404 > ``` > * NOTES ON CHECKPOINTING: In the case of a {@link FileInputFormat}, the source > * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the > * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards > * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits, > * without waiting for the readers to finish reading. This implies that no more checkpoint > * barriers are going to be forwarded after the source exits, thus having no checkpoints. > ``` > > It seems that there are some benefit when use `ContinuousFileMonitoringFunction` in streaming mode. But the cost is that we can not use multi-paths. After discuss with @wuchong offline, I will use `ContinuousFileMonitoringFunction` in streaming-mode (require streaming exactly-once) and use `InputFormatSourceFunction` in batch-mode (do not need care about checkpoints). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangxlong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner
wangxlong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner URL: https://github.com/apache/flink/pull/9916#discussion_r338389183 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ## @@ -164,3 +180,96 @@ class ExpressionReducer(config: TableConfig) } } } + +/** + * A [[ConstantFunctionContext]] allows to obtain user-defined configuration information set + * in [[TableConfig]]. + * + * @param parameters User-defined configuration set in [[TableConfig]]. + */ +class ConstantFunctionContext(parameters: Configuration) extends FunctionContext(null) { + + override def getMetricGroup: MetricGroup = { +throw new UnsupportedOperationException("getMetricGroup is not supported when optimizing") + } + + override def getCachedFile(name: String): File = { +throw new UnsupportedOperationException("getCachedFile is not supported when optimizing") + } + + /** +* Gets the user-defined configuration value associated with the given key as a string. +* +* @param key key pointing to the associated value +* @param defaultValue default value which is returned in case user-defined configuration +* value is null or there is no value associated with the given key +* @return (default) value associated with the given key +*/ + override def getJobParameter(key: String, defaultValue: String): String = { +parameters.getString(key, defaultValue) + } +} + +/** + * A [[ConstantFunctionCodeGenerator]] used for constant expression code generator + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input1 type information about the first input of the Function + * @param functionContext functionContext that used for code generator + * @param parameters parameters that accessed by openFunction + */ +class ConstantFunctionCodeGenerator(config: TableConfig, +nullableInput: Boolean, +input1: TypeInformation[_ <: Any], +functionContext: FunctionContext, +parameters: String = null) + extends FunctionCodeGenerator(config, nullableInput, input1) { + + override def addReusableFunction(function: UserDefinedFunction, Review comment: Hi @wuchong , updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9948: [FLINK-14451][runtime] Refactor FailoverTopology to extend base topology
zhuzhurk commented on a change in pull request #9948: [FLINK-14451][runtime] Refactor FailoverTopology to extend base topology URL: https://github.com/apache/flink/pull/9948#discussion_r338388819 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverResultPartition.java ## @@ -17,41 +17,14 @@ package org.apache.flink.runtime.executiongraph.failover.flip1; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.topology.Result; /** - * A connection between {@link FailoverVertex FailoverVertices}. - * - * producer -> ResultPartition -> consumer + * Represents a {@link IntermediateResultPartition} produced by a {@link FailoverVertex}. */ -public interface FailoverEdge { - - /** -* Returns the ID of the result partition that the source produces. -* -* @return ID of the result partition that the source produces -*/ - IntermediateResultPartitionID getResultPartitionID(); - - /** -* Returns the {@link ResultPartitionType} of the produced result partition. -* -* @return type of the produced result partition -*/ - ResultPartitionType getResultPartitionType(); - - /** -* Returns the source vertex, i.e., the producer of the result partition. -* -* @return source vertex -*/ - FailoverVertex getSourceVertex(); - - /** -* Returns the target vertex, i.e., the consumer of the result partition. -* -* @return target vertex -*/ - FailoverVertex getTargetVertex(); +public interface FailoverResultPartition, R extends FailoverResultPartition> Review comment: @tillrohrmann should we keep the generics for FailoverTopology/SchedulingTopology? take the alternative? Or do you have any other ideas? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338387696 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + + /** +* Adds column name to the registered column set. This will add nested column names recursive. +* Nested column names are qualified using "." separator. +*/ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** +* Returns true if the column name is existed in the registered column set. +* This supports qualified column name using "." separator. +*/ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: Why do you think a row time field "should not be" a `array[1]` or `map['key1']` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
flinkbot edited a comment on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#issuecomment-542617499 ## CI report: * f20e67a260971f17d77c7ecc7c55b04136e432c5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132123750) * 3c1480f11f084fb2d24618cb8d8f1b992c955f96 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133312035) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338386955 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + + /** +* Adds column name to the registered column set. This will add nested column names recursive. +* Nested column names are qualified using "." separator. +*/ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** +* Returns true if the column name is existed in the registered column set. +* This supports qualified column name using "." separator. +*/ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: What I mean "not support" is rowtime field should not be a `array[1]` or `map['key1']` , and this is guaranteed by sql parser for indentifer. We can of course allow `ARRAY` and `MULTISET` as the nested type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9542: [FLINK-13873][metrics] Change the column family as tags for influxdb …
flinkbot edited a comment on issue #9542: [FLINK-13873][metrics] Change the column family as tags for influxdb … URL: https://github.com/apache/flink/pull/9542#issuecomment-525276537 ## CI report: * e8636926351f3d406962dcadba275e20e49aff39 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/124736151) * d23ea97e8419bbacea0698b3ba82a459d940cf38 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128606376) * 05977cd49eb306d768668be4e8cb31034343df02 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128606947) * 057d453e5e00656bcfcb87d1d69172f614b2d11f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128681895) * 5001042b5fc5202da14c06e2d21faf1427f50a66 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128683546) * 3a1e39e889daf0fca82c54982911ded35df6cb77 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128692102) * 2745a3cb02e601a1a26360e9d6b3f0af5428c66a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129027621) * e108aa24e739d6f48f63efc560c3fc049b509860 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131172797) * 24a270d468dea677379713d5cf402ea453d9f222 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131246042) * acf1c2b9add8c3b903a8485ed41c9f0b18d97729 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131260202) * 2f5f28b6bc2c24e81b330a3ad65cde2a23a1af95 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/132564124) * 1d5388df33162240b94852f742ce6d7d7a468be4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132974400) * 10d9ad9fcc229d5612333d88cc7b6493224d3411 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133036651) * 1e5b6fec37e737bf136589e9df5da816c87ee191 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133106171) * 4869a205244c28cd57a59f25c28e2f49d4fb296c : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133315316) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9980: [WIP][FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods
flinkbot edited a comment on issue #9980: [WIP][FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods URL: https://github.com/apache/flink/pull/9980#issuecomment-545692886 ## CI report: * 413eea83e59307f0709a9d4e055dcd9015c846d5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133297914) * c249ada8fb90b1625778e37ca91fbd1dc361417f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133308181) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774 ## CI report: * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131080663) * ec84f32bdcae0738d97ac60090691789334a279a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133306001) * 6a12e5941baf73958146f54365ff3f267c696e11 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133310272) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9542: [FLINK-13873][metrics] Change the column family as tags for influxdb …
flinkbot edited a comment on issue #9542: [FLINK-13873][metrics] Change the column family as tags for influxdb … URL: https://github.com/apache/flink/pull/9542#issuecomment-525276537 ## CI report: * e8636926351f3d406962dcadba275e20e49aff39 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/124736151) * d23ea97e8419bbacea0698b3ba82a459d940cf38 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128606376) * 05977cd49eb306d768668be4e8cb31034343df02 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128606947) * 057d453e5e00656bcfcb87d1d69172f614b2d11f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128681895) * 5001042b5fc5202da14c06e2d21faf1427f50a66 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128683546) * 3a1e39e889daf0fca82c54982911ded35df6cb77 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128692102) * 2745a3cb02e601a1a26360e9d6b3f0af5428c66a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129027621) * e108aa24e739d6f48f63efc560c3fc049b509860 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131172797) * 24a270d468dea677379713d5cf402ea453d9f222 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131246042) * acf1c2b9add8c3b903a8485ed41c9f0b18d97729 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131260202) * 2f5f28b6bc2c24e81b330a3ad65cde2a23a1af95 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/132564124) * 1d5388df33162240b94852f742ce6d7d7a468be4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132974400) * 10d9ad9fcc229d5612333d88cc7b6493224d3411 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133036651) * 1e5b6fec37e737bf136589e9df5da816c87ee191 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133106171) * 4869a205244c28cd57a59f25c28e2f49d4fb296c : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9977: [FLINK-14497][python] Support primitive data types in Python user-defined functions
flinkbot edited a comment on issue #9977: [FLINK-14497][python] Support primitive data types in Python user-defined functions URL: https://github.com/apache/flink/pull/9977#issuecomment-545444286 ## CI report: * 89d33a5aeefd6934261cc185c306c2e55b6c8ad2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133195423) * 982528a22bd2cfdfdb367561464079946d158a4a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133219672) * e8feba0292bd3bd15320ba91f1ee1ef61aaf540f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133308166) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
flinkbot edited a comment on issue #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#issuecomment-57625 ## CI report: * b23539ddc208cdfe371759a4ba3ef7d7dc3b : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/132790733) * 134ce2294bd7f72d99286f0687801cf6290be079 : UNKNOWN * c09b396a33664e21ff6260d9de294dceaf924f0d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132797062) * c6c8f65b4fddefe6b09f67546126af91c8b122ca : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133175177) * 10f069d823bcd11709a2917ee9da17355ff1d97c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133312058) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
flinkbot edited a comment on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#issuecomment-542617499 ## CI report: * f20e67a260971f17d77c7ecc7c55b04136e432c5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132123750) * 3c1480f11f084fb2d24618cb8d8f1b992c955f96 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133312035) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9894: [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage.
flinkbot edited a comment on issue #9894: [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage. URL: https://github.com/apache/flink/pull/9894#issuecomment-541559889 ## CI report: * 89595e0af27e0ae61f0fcd956ec422f203a3ab95 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131751296) * b1aed08d2bbeaf73f6dc2a293f912b368319531b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131758748) * e9c95e6f51ff45b69df5d248782921aa8e6edaab : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132077078) * 9c05be8a14dbf6b31fd24f6114be36e1adec3b3a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132082371) * a27700b7e163407788a4aac0d66d18d0c0efdefc : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133308152) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #9803: [FLINK-14265][table-planner-blink] Don't use ContinuousFileReaderOperator to support multiple paths
JingsongLi commented on issue #9803: [FLINK-14265][table-planner-blink] Don't use ContinuousFileReaderOperator to support multiple paths URL: https://github.com/apache/flink/pull/9803#issuecomment-545736961 ``` * NOTES ON CHECKPOINTING: In the case of a {@link FileInputFormat}, the source * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits, * without waiting for the readers to finish reading. This implies that no more checkpoint * barriers are going to be forwarded after the source exits, thus having no checkpoints. ``` It seems that there are some benefit when use `ContinuousFileMonitoringFunction` in streaming mode. But the cost is that we can not use multi-paths. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
flinkbot edited a comment on issue #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#issuecomment-57625 ## CI report: * b23539ddc208cdfe371759a4ba3ef7d7dc3b : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/132790733) * 134ce2294bd7f72d99286f0687801cf6290be079 : UNKNOWN * c09b396a33664e21ff6260d9de294dceaf924f0d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132797062) * c6c8f65b4fddefe6b09f67546126af91c8b122ca : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133175177) * 10f069d823bcd11709a2917ee9da17355ff1d97c : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
flinkbot edited a comment on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#issuecomment-542617499 ## CI report: * f20e67a260971f17d77c7ecc7c55b04136e432c5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132123750) * 3c1480f11f084fb2d24618cb8d8f1b992c955f96 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
flinkbot edited a comment on issue #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#issuecomment-542062879 ## CI report: * efdb2ad3942939590bbb0203ab04bae5fec634a9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131912243) * a30831a94681f1332648b34ce2978f883d4f33d5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133306011) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774 ## CI report: * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131080663) * ec84f32bdcae0738d97ac60090691789334a279a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133306001) * 6a12e5941baf73958146f54365ff3f267c696e11 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133310272) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al…
flinkbot edited a comment on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al… URL: https://github.com/apache/flink/pull/9300#issuecomment-516849331 ## CI report: * 9b5b5ff5df053498e491d43a04f44d5ba452579c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/121415160) * eb6cf333f4331ed20d7e22a056cbd3c9b61f31f8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124204150) * 6440190df24a227f96e2b917acecccee04ab981b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128458494) * de471a0446423d026ae75476f1e4892126668a40 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128501969) * c377de8dd42673a3bd229e9e05e28cbd1c1863d7 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129638886) * 5dde9734d42c1f427ad574a4d5fdbd4bd4f35fdd : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132989695) * 9dd25a6341c3890794f3c327b9e94589fee70f52 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133133237) * f3a7855f74b9708e25e19fcca58e18abb64614e2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133308139) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Mrart commented on issue #9542: [FLINK-13873][metrics] Change the column family as tags for influxdb …
Mrart commented on issue #9542: [FLINK-13873][metrics] Change the column family as tags for influxdb … URL: https://github.com/apache/flink/pull/9542#issuecomment-545732095 > Nearly there ;) thks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338371878 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + + /** +* Adds column name to the registered column set. This will add nested column names recursive. +* Nested column names are qualified using "." separator. +*/ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** +* Returns true if the column name is existed in the registered column set. +* This supports qualified column name using "." separator. +*/ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: If that is the case, we should match the `ARRAY` and `MULTISET` type explicitly in the `ColumnValidator`, and throws a message that is readable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338371293 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -412,13 +412,17 @@ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwab }; } - private void stopDeployment(final DeploymentHandle deploymentHandle) { - cancelExecutionVertex(deploymentHandle.getExecutionVertexId()); + private void releaseUnassignedSlotIfPresent(final DeploymentHandle deploymentHandle) { // Canceling the vertex normally releases the slot. However, we might not have assigned // the slot to the vertex yet. + // Only release unassigned slot to guarantee no vertex state change happens here. deploymentHandle .getLogicalSlot() - .ifPresent(logicalSlot -> logicalSlot.releaseSlot(null)); + .ifPresent(logicalSlot -> { + if (logicalSlot.getPayload() != null) { Review comment: I gave some more thoughts on it and thinks we can even remove this release logic since an unassigned slot will never get released here. - if a slot is assigned with a payload (iff it is assigned to an execution or is released), there's no need to release it here - if a slot is not assigned and needs a release a. in `assignResourceOrHandleError`'s outdated deployment handling block. It's not possible to happen because the vertex is outdated iff it has been restarted and the slot request will be canceled then b. in `deployOrHandleError`'s outdated deployment handling block. This happens iff the preceding `assignResourceOrHandleError` of the same vertex is done without a successful assigning, which means b.1. case *a)* happened but it's not possible to happen b.2. unexpected error happened in `assignResourceOrHandleError`. But `deployOrHandleError` can not be invoked in this case since `deployAll` will propagate the unexpected error to force a JM restart (ignoring `deployIndividually` which would be removed in FLINK-14162, see discussion [here](https://github.com/apache/flink/pull/9860#discussion_r334367314) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338371293 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -412,13 +412,17 @@ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwab }; } - private void stopDeployment(final DeploymentHandle deploymentHandle) { - cancelExecutionVertex(deploymentHandle.getExecutionVertexId()); + private void releaseUnassignedSlotIfPresent(final DeploymentHandle deploymentHandle) { // Canceling the vertex normally releases the slot. However, we might not have assigned // the slot to the vertex yet. + // Only release unassigned slot to guarantee no vertex state change happens here. deploymentHandle .getLogicalSlot() - .ifPresent(logicalSlot -> logicalSlot.releaseSlot(null)); + .ifPresent(logicalSlot -> { + if (logicalSlot.getPayload() != null) { Review comment: I gave some more thoughts on it and thinks we can even remove this release logic since an unassigned slot will never get released here. - if a slot is assigned with a payload (iff it is assigned to an execution or is released), there's no need to release it here - if a slot is not assigned and needs a release a) in `assignResourceOrHandleError`'s outdated deployment handling block. It's not possible to happen because the vertex is outdated iff it has been restarted and the slot request will be canceled then b) in `deployOrHandleError`'s outdated deployment handling block. This happens iff the preceding `assignResourceOrHandleError` of the same vertex is done without a successful assigning, which means b.1) case a) happened but it's not possible to happen b.2) unexpected error happened in `assignResourceOrHandleError`. But `deployOrHandleError` can not be invoked in this case since `deployAll` will propagate the unexpected error to force a JM restart (ignoring `deployIndividually` which would be removed in FLINK-14162, see discussion [here](https://github.com/apache/flink/pull/9860#discussion_r334367314) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338371293 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -412,13 +412,17 @@ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwab }; } - private void stopDeployment(final DeploymentHandle deploymentHandle) { - cancelExecutionVertex(deploymentHandle.getExecutionVertexId()); + private void releaseUnassignedSlotIfPresent(final DeploymentHandle deploymentHandle) { // Canceling the vertex normally releases the slot. However, we might not have assigned // the slot to the vertex yet. + // Only release unassigned slot to guarantee no vertex state change happens here. deploymentHandle .getLogicalSlot() - .ifPresent(logicalSlot -> logicalSlot.releaseSlot(null)); + .ifPresent(logicalSlot -> { + if (logicalSlot.getPayload() != null) { Review comment: I gave some more thoughts on it and thinks we can even remove this release logic since an unassigned slot will never get released here. - if a slot is assigned with a payload (iff it is assigned to an execution or is released), there's no need to release it here - if a slot is not assigned and needs a release - a) in `assignResourceOrHandleError`'s outdated deployment handling block. It's not possible to happen because the vertex is outdated iff it has been restarted and the slot request will be canceled then - b) in `deployOrHandleError`'s outdated deployment handling block. This happens iff the preceding `assignResourceOrHandleError` of the same vertex is done without a successful assigning, which means - b.1) case *a)* happened but it's not possible to happen - b.2) unexpected error happened in `assignResourceOrHandleError`. But `deployOrHandleError` can not be invoked in this case since `deployAll` will propagate the unexpected error to force a JM restart (ignoring `deployIndividually` which would be removed in FLINK-14162, see discussion [here](https://github.com/apache/flink/pull/9860#discussion_r334367314) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338371293 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -412,13 +412,17 @@ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwab }; } - private void stopDeployment(final DeploymentHandle deploymentHandle) { - cancelExecutionVertex(deploymentHandle.getExecutionVertexId()); + private void releaseUnassignedSlotIfPresent(final DeploymentHandle deploymentHandle) { // Canceling the vertex normally releases the slot. However, we might not have assigned // the slot to the vertex yet. + // Only release unassigned slot to guarantee no vertex state change happens here. deploymentHandle .getLogicalSlot() - .ifPresent(logicalSlot -> logicalSlot.releaseSlot(null)); + .ifPresent(logicalSlot -> { + if (logicalSlot.getPayload() != null) { Review comment: I gave some more thoughts on it and thinks we can even remove this release logic since an unassigned slot will never get released here. - if a slot is assigned with a payload (iff it is assigned to an execution or is released), there's no need to release it here - if a slot is not assigned and needs a release -a) in `assignResourceOrHandleError`'s outdated deployment handling block. It's not possible to happen because the vertex is outdated iff it has been restarted and the slot request will be canceled then -b) in `deployOrHandleError`'s outdated deployment handling block. This happens iff the preceding `assignResourceOrHandleError` of the same vertex is done without a successful assigning, which means -b.1) case *a)* happened but it's not possible to happen -b.2) unexpected error happened in `assignResourceOrHandleError`. But `deployOrHandleError` can not be invoked in this case since `deployAll` will propagate the unexpected error to force a JM restart (ignoring `deployIndividually` which would be removed in FLINK-14162, see discussion [here](https://github.com/apache/flink/pull/9860#discussion_r334367314) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14515) Implement LimitableTableSource for FileSystemTableFactory
Jingsong Lee created FLINK-14515: Summary: Implement LimitableTableSource for FileSystemTableFactory Key: FLINK-14515 URL: https://issues.apache.org/jira/browse/FLINK-14515 Project: Flink Issue Type: Sub-task Components: Connectors / Common Reporter: Jingsong Lee Fix For: 1.10.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9981: [FLINK-13195][sql-client] Add create table support for SqlClient
flinkbot edited a comment on issue #9981: [FLINK-13195][sql-client] Add create table support for SqlClient URL: https://github.com/apache/flink/pull/9981#issuecomment-545714180 ## CI report: * 52a10e0283f21047ac18d6668b71cfa4f6fc21a9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133304303) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangxlong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner
wangxlong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner URL: https://github.com/apache/flink/pull/9916#discussion_r338371307 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ## @@ -164,3 +180,96 @@ class ExpressionReducer(config: TableConfig) } } } + +/** + * A [[ConstantFunctionContext]] allows to obtain user-defined configuration information set + * in [[TableConfig]]. + * + * @param parameters User-defined configuration set in [[TableConfig]]. + */ +class ConstantFunctionContext(parameters: Configuration) extends FunctionContext(null) { + + override def getMetricGroup: MetricGroup = { +throw new UnsupportedOperationException("getMetricGroup is not supported when optimizing") + } + + override def getCachedFile(name: String): File = { +throw new UnsupportedOperationException("getCachedFile is not supported when optimizing") + } + + /** +* Gets the user-defined configuration value associated with the given key as a string. +* +* @param key key pointing to the associated value +* @param defaultValue default value which is returned in case user-defined configuration +* value is null or there is no value associated with the given key +* @return (default) value associated with the given key +*/ + override def getJobParameter(key: String, defaultValue: String): String = { +parameters.getString(key, defaultValue) + } +} + +/** + * A [[ConstantFunctionCodeGenerator]] used for constant expression code generator + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input1 type information about the first input of the Function + * @param functionContext functionContext that used for code generator + * @param parameters parameters that accessed by openFunction + */ +class ConstantFunctionCodeGenerator(config: TableConfig, +nullableInput: Boolean, +input1: TypeInformation[_ <: Any], +functionContext: FunctionContext, +parameters: String = null) + extends FunctionCodeGenerator(config, nullableInput, input1) { + + override def addReusableFunction(function: UserDefinedFunction, Review comment: Yes, I think this will be better This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9980: [WIP][FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods
flinkbot edited a comment on issue #9980: [WIP][FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods URL: https://github.com/apache/flink/pull/9980#issuecomment-545692886 ## CI report: * 413eea83e59307f0709a9d4e055dcd9015c846d5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133297914) * c249ada8fb90b1625778e37ca91fbd1dc361417f : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133308181) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #9927: [FLINK-14397][hive] Failed to run Hive UDTF with array arguments
bowenli86 commented on issue #9927: [FLINK-14397][hive] Failed to run Hive UDTF with array arguments URL: https://github.com/apache/flink/pull/9927#issuecomment-545731315 LGTM! Merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9977: [FLINK-14497][python] Support primitive data types in Python user-defined functions
flinkbot edited a comment on issue #9977: [FLINK-14497][python] Support primitive data types in Python user-defined functions URL: https://github.com/apache/flink/pull/9977#issuecomment-545444286 ## CI report: * 89d33a5aeefd6934261cc185c306c2e55b6c8ad2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133195423) * 982528a22bd2cfdfdb367561464079946d158a4a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133219672) * e8feba0292bd3bd15320ba91f1ee1ef61aaf540f : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133308166) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338371293 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -412,13 +412,17 @@ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwab }; } - private void stopDeployment(final DeploymentHandle deploymentHandle) { - cancelExecutionVertex(deploymentHandle.getExecutionVertexId()); + private void releaseUnassignedSlotIfPresent(final DeploymentHandle deploymentHandle) { // Canceling the vertex normally releases the slot. However, we might not have assigned // the slot to the vertex yet. + // Only release unassigned slot to guarantee no vertex state change happens here. deploymentHandle .getLogicalSlot() - .ifPresent(logicalSlot -> logicalSlot.releaseSlot(null)); + .ifPresent(logicalSlot -> { + if (logicalSlot.getPayload() != null) { Review comment: I gave some more thoughts on it and thinks we can even remove this release logic since an unassigned slot will never get released here. - if a slot is assigned with a payload (iff it is assigned to an execution or is released), there's no need to release it here - if a slot is not assigned and needs a release a) in `assignResourceOrHandleError`'s outdated deployment handling block. It's not possible to happen because the vertex is outdated iff it has been restarted and the slot request will be canceled then b) in `deployOrHandleError`'s outdated deployment handling block. This happens iff the preceding `assignResourceOrHandleError` of the same vertex is done without a successful assigning, which means b.1) case *a)* happened but it's not possible to happen b.2) unexpected error happened in `assignResourceOrHandleError`. But `deployOrHandleError` can not be invoked in this case since `deployAll` will propagate the unexpected error to force a JM restart (ignoring `deployIndividually` which would be removed in FLINK-14162, see discussion [here](https://github.com/apache/flink/pull/9860#discussion_r334367314) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9894: [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage.
flinkbot edited a comment on issue #9894: [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage. URL: https://github.com/apache/flink/pull/9894#issuecomment-541559889 ## CI report: * 89595e0af27e0ae61f0fcd956ec422f203a3ab95 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131751296) * b1aed08d2bbeaf73f6dc2a293f912b368319531b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131758748) * e9c95e6f51ff45b69df5d248782921aa8e6edaab : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132077078) * 9c05be8a14dbf6b31fd24f6114be36e1adec3b3a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132082371) * a27700b7e163407788a4aac0d66d18d0c0efdefc : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133308152) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774 ## CI report: * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131080663) * ec84f32bdcae0738d97ac60090691789334a279a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133306001) * 6a12e5941baf73958146f54365ff3f267c696e11 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al…
flinkbot edited a comment on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al… URL: https://github.com/apache/flink/pull/9300#issuecomment-516849331 ## CI report: * 9b5b5ff5df053498e491d43a04f44d5ba452579c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/121415160) * eb6cf333f4331ed20d7e22a056cbd3c9b61f31f8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124204150) * 6440190df24a227f96e2b917acecccee04ab981b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128458494) * de471a0446423d026ae75476f1e4892126668a40 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128501969) * c377de8dd42673a3bd229e9e05e28cbd1c1863d7 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129638886) * 5dde9734d42c1f427ad574a4d5fdbd4bd4f35fdd : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132989695) * 9dd25a6341c3890794f3c327b9e94589fee70f52 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133133237) * f3a7855f74b9708e25e19fcca58e18abb64614e2 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133308139) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
JingsongLi commented on issue #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#issuecomment-545728276 > Thanks for your PR. I leave some comments. > btw: the last 3 commit's have wrong commit msg (table-planner-planner -> table-planner-blink). Thanks for your review, updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on a change in pull request #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
jinglining commented on a change in pull request #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#discussion_r338368266 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java ## @@ -71,7 +72,11 @@ public JobExceptionsHandler( } @Override - protected JobExceptionsInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) { + protected JobExceptionsInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) { + List sizes = request.getQueryParameter(ExceptionShowSizeParameter.class); + if (sizes != null && sizes.size() == 1) { Review comment: As MessageQueryParameter's type is List, sizes has to define to List. It's no good. Maybe we could split MessageQueryParameter to MessageQueryListParameter and MessageQueryObjectParameter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-14270) new web ui should display more than 4 metrics
[ https://issues.apache.org/jira/browse/FLINK-14270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-14270: --- Assignee: Yadong Xie > new web ui should display more than 4 metrics > - > > Key: FLINK-14270 > URL: https://issues.apache.org/jira/browse/FLINK-14270 > Project: Flink > Issue Type: New Feature > Components: Runtime / Web Frontend >Affects Versions: 1.9.0 >Reporter: David Anderson >Assignee: Yadong Xie >Priority: Major > Attachments: input-metrics-all-zero.png > > > The old web UI can display at least 9 metrics at once, and this can be > valuable. > The new interface is limited to 4 metrics, which is not enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] guoweiM commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever
guoweiM commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever URL: https://github.com/apache/flink/pull/9950#discussion_r338367848 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; + +/** + * Abstract class for the JobGraphRetriever, which wants to get classpath user's code depends on. + */ +public abstract class AbstractUserClassPathJobGraphRetriever implements JobGraphRetriever { + + protected static final Logger LOG = LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class); + + /** The directory contains all the jars, which user code depends on. */ + @Nullable + private final String jobDir; + + private List userClassPaths; + + public AbstractUserClassPathJobGraphRetriever(String jobDir) { + this.jobDir = jobDir; + } + + protected List getUserClassPaths() throws IOException { + if (userClassPaths == null) { + userClassPaths = getRelativeJarsURLFromDir(jobDir); + } + return userClassPaths; + } + + /** +* Scan all the jar files in the {@code dir} and return all these jar files' relative URLs to "user.dir". +* @param dir the dir needed to scan the jar files +* @return the jar files' relative URLs +* @throws IOException +*/ + private List getRelativeJarsURLFromDir(String dir) throws IOException { Review comment: After the offline discussion with Till. This method will be breaking down to three functions `List listFilesInPath(File directory, Predicate fileFilter)` `List relativizeToWorkingDir(List)` `List toRelativeURL(List)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#discussion_r338367399 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala ## @@ -80,23 +81,12 @@ object TableSinkUtils { "field names via 'getPartitionFieldNames()' method." sink match { case pts: PartitionableTableSink => - val partitionFields = pts.getPartitionFieldNames - if (partitionFields == null || partitionFields.isEmpty) { -throw new ValidationException(invalidMsg) - } staticPartitions.map(_._1) foreach { p => -if (!partitionFields.contains(p)) { +if (!partitionKeys.contains(p)) { throw new ValidationException(s"Static partition column $p " + -s"should be in the partition fields list $partitionFields.") +s"should be in the partition fields list $partitionKeys.") } } - staticPartitions.map(_._1).zip(partitionFields).foreach { Review comment: Yes, this dependents on implementation, we can support random order. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on a change in pull request #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
jinglining commented on a change in pull request #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#discussion_r338367329 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/ExceptionShowSizeParameter.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +/** + * @see org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler + * show size for JobExceptionsHandler + */ +public class ExceptionShowSizeParameter extends MessageQueryParameter { + + private static final String QUERY_PARAMETER_NAME = "size"; Review comment: How about limit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#discussion_r338367203 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala ## @@ -311,3 +283,93 @@ object PartitionableSinkITCase { row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?") ) } + +private class TestSink( +rowType: RowTypeInfo, +supportsGrouping: Boolean, +partitionColumns: Array[String]) +extends StreamTableSink[Row] +with PartitionableTableSink { + private var staticPartitions: JMap[String, String] = _ + + override def getPartitionFieldNames: JList[String] = partitionColumns.toList + + override def setStaticPartition(partitions: JMap[String, String]): Unit = +this.staticPartitions = partitions + + override def configure(fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this + + override def configurePartitionGrouping(s: Boolean): Boolean = { +supportsGrouping + } + + override def getTableSchema: TableSchema = { +new TableSchema(Array("a", "b", "c"), type3.getFieldTypes) + } + + override def getOutputType: RowTypeInfo = type3 + + override def emitDataStream(dataStream: DataStream[Row]): Unit = { +dataStream.addSink(new UnsafeMemorySinkFunction(type3)) +.setParallelism(dataStream.getParallelism) + } + + override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { +dataStream.addSink(new UnsafeMemorySinkFunction(type3)) +.setParallelism(dataStream.getParallelism) + } + + def getStaticPartitions: JMap[String, String] = { +staticPartitions + } +} + +class TestPartitionableSinkFactory extends TableSinkFactory[Row] with TableSourceFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { +val context = new util.HashMap[String, String]() +context.put(CONNECTOR_TYPE, "TestPartitionableSink") +context + } + + override def supportedProperties(): util.List[String] = { +val supported = new util.ArrayList[String]() +supported.add("*") +supported + } + + override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = { +val dp = new DescriptorProperties() +dp.putProperties(properties) + +val schema = dp.getTableSchema(SCHEMA) +val supportsGrouping = dp.getBoolean("supports-grouping") +val partitionColumns = dp.getArray("partition-column", new function.Function[String, String] { + override def apply(t: String): String = dp.getString(t) +}) +new TestSink( + schema.toRowType.asInstanceOf[RowTypeInfo], + supportsGrouping, + partitionColumns.asScala.toArray[String]) + } + + /** +* Remove it after FLINK-14387. +*/ + override def createTableSource(properties: JMap[String, String]): TableSource[Row] = { Review comment: Yes, only used for `getTableSchema`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#discussion_r338366730 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala ## @@ -140,9 +94,7 @@ class PartitionableSinkITCase { "1,4,你好,陌生人,我是", "1,4,你好,陌生人,我是中国人", "1,4,你好,陌生人,我是中国人,你来自哪里?"), - RESULT1.toList) -assert(RESULT2.isEmpty) -assert(RESULT3.isEmpty) + RESULT.toList) } @Test Review comment: No, we only remove dynamic partition shuffle support in legacy planner. The shuffle is only optimizer, the result should keep same. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9977: [FLINK-14497][python] Support primitive data types in Python user-defined functions
flinkbot edited a comment on issue #9977: [FLINK-14497][python] Support primitive data types in Python user-defined functions URL: https://github.com/apache/flink/pull/9977#issuecomment-545444286 ## CI report: * 89d33a5aeefd6934261cc185c306c2e55b6c8ad2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133195423) * 982528a22bd2cfdfdb367561464079946d158a4a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133219672) * e8feba0292bd3bd15320ba91f1ee1ef61aaf540f : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9980: [WIP][FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods
flinkbot edited a comment on issue #9980: [WIP][FLINK-14398][SQL/Legacy Planner]Further split input unboxing code into separate methods URL: https://github.com/apache/flink/pull/9980#issuecomment-545692886 ## CI report: * 413eea83e59307f0709a9d4e055dcd9015c846d5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133297914) * c249ada8fb90b1625778e37ca91fbd1dc361417f : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14508) google protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-14508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YING HOU updated FLINK-14508: - Attachment: (was: image-2019-10-23-18-07-27-188.png) > google protobuf is not shaded > -- > > Key: FLINK-14508 > URL: https://issues.apache.org/jira/browse/FLINK-14508 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.1, 1.8.2 >Reporter: YING HOU >Priority: Major > Attachments: image-2019-10-23-16-55-00-959.png > > > I try to use phoenix in my flink project. When I use > 'org.apache.phoenix.queryserver.client.Driver' as my jdbc driver which is > inherited from 'org.apache.calcite.avatica.remote.Driver', I got a > ClassNotFoundException as follow: > !image-2019-10-23-16-55-00-959.png! > > I guess the protobuf-java may not be shaded in the module flink-table-planner -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink
JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink URL: https://github.com/apache/flink/pull/9909#discussion_r338366580 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala ## @@ -59,7 +59,7 @@ class PartitionableSinkITCase { @Before def before(): Unit = { -batchExec.setParallelism(3) +batchExec.setParallelism(1) Review comment: Now, we removed the shuffle on dynamic sink, if there are 3 sinks, the data are randoms, but tests in PartitionableSinkITCase dependents on the shuffle, so I set parallelism to 1 to let tests simpler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14508) google protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-14508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YING HOU updated FLINK-14508: - Attachment: (was: pom.xml) > google protobuf is not shaded > -- > > Key: FLINK-14508 > URL: https://issues.apache.org/jira/browse/FLINK-14508 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.1, 1.8.2 >Reporter: YING HOU >Priority: Major > Attachments: image-2019-10-23-16-55-00-959.png > > > I try to use phoenix in my flink project. When I use > 'org.apache.phoenix.queryserver.client.Driver' as my jdbc driver which is > inherited from 'org.apache.calcite.avatica.remote.Driver', I got a > ClassNotFoundException as follow: > !image-2019-10-23-16-55-00-959.png! > > I guess the protobuf-java may not be shaded in the module flink-table-planner -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14508) google protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-14508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YING HOU updated FLINK-14508: - Description: I try to use phoenix in my flink project. When I use 'org.apache.phoenix.queryserver.client.Driver' as my jdbc driver which is inherited from 'org.apache.calcite.avatica.remote.Driver', I got a ClassNotFoundException as follow: !image-2019-10-23-16-55-00-959.png! I guess the protobuf-java may not be shaded in the module flink-table-planner was: I try to use phoenix in my flink project. When I use 'org.apache.phoenix.queryserver.client.Driver' as my jdbc driver which is inherited from 'org.apache.calcite.avatica.remote.Driver', I got a ClassNotFoundException as follow: !image-2019-10-23-16-55-00-959.png! I guess the protobuf-java may not be shaded in the module flink-table-planner So, I add something in pom.xml. !image-2019-10-23-18-07-27-188.png! > google protobuf is not shaded > -- > > Key: FLINK-14508 > URL: https://issues.apache.org/jira/browse/FLINK-14508 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.1, 1.8.2 >Reporter: YING HOU >Priority: Major > Attachments: image-2019-10-23-16-55-00-959.png, > image-2019-10-23-18-07-27-188.png, pom.xml > > > I try to use phoenix in my flink project. When I use > 'org.apache.phoenix.queryserver.client.Driver' as my jdbc driver which is > inherited from 'org.apache.calcite.avatica.remote.Driver', I got a > ClassNotFoundException as follow: > !image-2019-10-23-16-55-00-959.png! > > I guess the protobuf-java may not be shaded in the module flink-table-planner -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
flinkbot edited a comment on issue #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#issuecomment-542062879 ## CI report: * efdb2ad3942939590bbb0203ab04bae5fec634a9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131912243) * a30831a94681f1332648b34ce2978f883d4f33d5 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133306011) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9894: [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage.
flinkbot edited a comment on issue #9894: [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage. URL: https://github.com/apache/flink/pull/9894#issuecomment-541559889 ## CI report: * 89595e0af27e0ae61f0fcd956ec422f203a3ab95 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131751296) * b1aed08d2bbeaf73f6dc2a293f912b368319531b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131758748) * e9c95e6f51ff45b69df5d248782921aa8e6edaab : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132077078) * 9c05be8a14dbf6b31fd24f6114be36e1adec3b3a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132082371) * a27700b7e163407788a4aac0d66d18d0c0efdefc : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338364936 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + Review comment: This is not only used for field name checking, but also used for rowtime field reference check. During column expression conversion, there is still no validation for rowtime field. We need to validate the rowtime field exactly exists, even if it is a nested field. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774 ## CI report: * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131080663) * ec84f32bdcae0738d97ac60090691789334a279a : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133306001) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
wuchong commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338364552 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlWatermark.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + + +/** + * Watermark statement in CREATE TABLE DDL, e.g. {@code WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}. + */ Review comment: I don't think so. This is not a public API, we don't need to list the full syntax. We will add the full syntax and examples in documentation. We didn't add full syntax javadoc on {{SqlCreateTable}} too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al…
flinkbot edited a comment on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al… URL: https://github.com/apache/flink/pull/9300#issuecomment-516849331 ## CI report: * 9b5b5ff5df053498e491d43a04f44d5ba452579c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/121415160) * eb6cf333f4331ed20d7e22a056cbd3c9b61f31f8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124204150) * 6440190df24a227f96e2b917acecccee04ab981b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128458494) * de471a0446423d026ae75476f1e4892126668a40 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128501969) * c377de8dd42673a3bd229e9e05e28cbd1c1863d7 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129638886) * 5dde9734d42c1f427ad574a4d5fdbd4bd4f35fdd : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132989695) * 9dd25a6341c3890794f3c327b9e94589fee70f52 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133133237) * f3a7855f74b9708e25e19fcca58e18abb64614e2 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner
wuchong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner URL: https://github.com/apache/flink/pull/9916#discussion_r338363743 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ## @@ -94,8 +99,15 @@ class ExpressionReducer(config: TableConfig) val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType)) val resultType = new RowTypeInfo(literalTypes: _*) +val parameters = if (config.getConfiguration != null) { + config.getConfiguration Review comment: Currently, we use `ExecutionConfig#getJobParameter` as the UDF's configuration in runtime. However, I think we should use `TableConfig#getConfiguration`. I opened another issue FLINK-14514 to discuss this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al…
xuyang1706 commented on issue #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al… URL: https://github.com/apache/flink/pull/9300#issuecomment-545722380 > Hi @xuyang1706 @becketqin Thanks for the discussion and sorry for the late response. This PR currently seems to be far away from the original intent of the [JIRA](https://issues.apache.org/jira/browse/FLINK-13513). I would suggest please change the title as well as the content for the commit message to make it more clear that we only support `ModelMapper` and not FlatMapper. Thanks for your suggestion, @walterddr . I have changed the title. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner
wuchong commented on a change in pull request #9916: [FLINK-14408][Table-Planner]UDF's open method is invoked to initialize when sql is optimized in oldPlanner URL: https://github.com/apache/flink/pull/9916#discussion_r338355369 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ## @@ -164,3 +180,96 @@ class ExpressionReducer(config: TableConfig) } } } + +/** + * A [[ConstantFunctionContext]] allows to obtain user-defined configuration information set + * in [[TableConfig]]. + * + * @param parameters User-defined configuration set in [[TableConfig]]. + */ +class ConstantFunctionContext(parameters: Configuration) extends FunctionContext(null) { + + override def getMetricGroup: MetricGroup = { +throw new UnsupportedOperationException("getMetricGroup is not supported when optimizing") + } + + override def getCachedFile(name: String): File = { +throw new UnsupportedOperationException("getCachedFile is not supported when optimizing") + } + + /** +* Gets the user-defined configuration value associated with the given key as a string. +* +* @param key key pointing to the associated value +* @param defaultValue default value which is returned in case user-defined configuration +* value is null or there is no value associated with the given key +* @return (default) value associated with the given key +*/ + override def getJobParameter(key: String, defaultValue: String): String = { +parameters.getString(key, defaultValue) + } +} + +/** + * A [[ConstantFunctionCodeGenerator]] used for constant expression code generator + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input1 type information about the first input of the Function + * @param functionContext functionContext that used for code generator + * @param parameters parameters that accessed by openFunction + */ +class ConstantFunctionCodeGenerator(config: TableConfig, +nullableInput: Boolean, +input1: TypeInformation[_ <: Any], +functionContext: FunctionContext, +parameters: String = null) + extends FunctionCodeGenerator(config, nullableInput, input1) { + + override def addReusableFunction(function: UserDefinedFunction, Review comment: Codes of this method are duplicate with the super class. Could we use the similar way as blink planner? Add a `functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext],` optional parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #9884: [FLINK-14266][table] Introduce RowCsvInputFormat to new CSV module
JingsongLi commented on issue #9884: [FLINK-14266][table] Introduce RowCsvInputFormat to new CSV module URL: https://github.com/apache/flink/pull/9884#issuecomment-545721995 > Btw did you have a look at #4660. Isn't this issue a duplicate? The differences are: - This PR is in flink-csv instead of flink-table. - This PR is consistent with the existing (de)serialization schema. - This PR deals with escaping characters with line delimiter. There are some other reasons why I put forward this PR: - #4660 has not been updated for quite some time. - This PR is just a small part of #4660. #4660 can be cut into parts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14514) Improve interface of FunctionContext#getJobParameter
[ https://issues.apache.org/jira/browse/FLINK-14514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958478#comment-16958478 ] Jark Wu commented on FLINK-14514: - cc [~twalthr] [~aljoscha] > Improve interface of FunctionContext#getJobParameter > > > Key: FLINK-14514 > URL: https://issues.apache.org/jira/browse/FLINK-14514 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Jark Wu >Priority: Major > > Currentlly, {{FunctionContext#getJobParameter}} gets the key-values from > env's ExecutionConfig, but during expression reducing, it gets key-values > from {{TableConfig#getConfiguration}} . As we are aiming to not expose > underlying env which means users can't access ExecutionConfig in the future. > So I propose to get from {{TableConfig#getConfiguration}} and make it clear > in javadoc. > This might be a in-compatible way, but considering it is not heavily used, I > think it's fine. We can add a release note for this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14514) Improve interface of FunctionContext#getJobParameter
[ https://issues.apache.org/jira/browse/FLINK-14514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-14514: Description: Currentlly, {{FunctionContext#getJobParameter}} gets the key-values from env's ExecutionConfig, but during expression reducing, it gets key-values from {{TableConfig#getConfiguration}} . As we are aiming to not expose underlying env which means users can't access ExecutionConfig in the future. So I propose to get from {{TableConfig#getConfiguration}} and make it clear in javadoc. This might be a in-compatible way, but considering it is not heavily used, I think it's fine. We can add a release note for this. was:Currentlly, {{FunctionContext#getJobParameter}} gets the key-values from env's ExecutionConfig, but during expression reducing, it gets key-values from {{TableConfig#getConfiguration}} . As we are aiming to not expose underlying env which means users can't access ExecutionConfig in the future. So I propose to get from {{TableConfig#getConfiguration}} and make it clear in javadoc. > Improve interface of FunctionContext#getJobParameter > > > Key: FLINK-14514 > URL: https://issues.apache.org/jira/browse/FLINK-14514 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Jark Wu >Priority: Major > > Currentlly, {{FunctionContext#getJobParameter}} gets the key-values from > env's ExecutionConfig, but during expression reducing, it gets key-values > from {{TableConfig#getConfiguration}} . As we are aiming to not expose > underlying env which means users can't access ExecutionConfig in the future. > So I propose to get from {{TableConfig#getConfiguration}} and make it clear > in javadoc. > This might be a in-compatible way, but considering it is not heavily used, I > think it's fine. We can add a release note for this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338363402 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -27,9 +27,31 @@ void TableColumn(TableCreationContext context) : UniqueKey(context.uniqueKeysList) | ComputedColumn(context) +| +Watermark(context) ) } +void Watermark(TableCreationContext context) : +{ +SqlIdentifier columnName; +SqlParserPos pos; +SqlNode watermarkStrategy; +} +{ + {pos = getPos();} +columnName = CompoundIdentifier() + +watermarkStrategy = Expression(ExprContext.ACCEPT_NON_QUERY) { Review comment: We may also need to add a test case for the invalid sub-query format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338363242 ## File path: flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties ## @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Resources for the Apache Calcite project. +# See wrapper class org.apache.calcite.runtime.CalciteResource. +# +MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported yet. +OverwriteIsOnlyAllowedForHive=OVERWRITE expression is only allowed for HIVE dialect. +OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT statement. Review comment: That's the point, thanks for the refactoring. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338363168 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlWatermark.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + + +/** + * Watermark statement in CREATE TABLE DDL, e.g. {@code WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}. + */ Review comment: Maybe we can list the full syntax we supported in the Java doc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338362915 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // - + + private static final class ColumnValidator { + + private final Set allColumnNames = new HashSet<>(); + Review comment: The `ColumnValidator` makes me a little confused, i'm wondering do we need the nested field replicates checking ? I mean during this phrase, what we can really do is just a name checking, during the column expression conversion (SqlNode to RexNode), we would finally have a validation though, do you think this check is necessary ? A little redundant from my side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al…
xuyang1706 commented on a change in pull request #9300: [FLINK-13513][ml] Add the FlatMapper and related classes for later al… URL: https://github.com/apache/flink/pull/9300#discussion_r338362984 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/mapper/ModelMapper.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.mapper; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.ml.common.model.ModelSource; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.List; + +/** + * Abstract class for mappers with model. + */ +public abstract class ModelMapper extends Mapper { + + /** +* Specify where to load model data. +*/ + private ModelSource modelSource; + + /** +* Field names of the model. +*/ + private final String[] modelFieldNames; + + /** +* Field types of the model. +*/ + private final DataType[] modelFieldTypes; + + public ModelMapper(TableSchema modelSchema, TableSchema dataSchema, Params params, ModelSource modelSource) { + super(dataSchema, params); + this.modelFieldNames = modelSchema.getFieldNames(); + this.modelFieldTypes = modelSchema.getFieldDataTypes(); + this.modelSource = modelSource; + } + + protected TableSchema getModelSchema() { + return TableSchema.builder().fields(this.modelFieldNames, this.modelFieldTypes).build(); + } + + /** +* Load model from the list of Row type data. +* +* @param modelRows the list of Row type data +*/ + public abstract void loadModel(List modelRows); + + @Override + public final void open(Configuration parameters) { + Preconditions.checkState(this.modelSource != null, "model source not set."); + List rows = this.modelSource.getModelRows(getRuntimeContext()); Review comment: We have updated the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13513) Add the Mapper and related classes for later algorithm implementations.
[ https://issues.apache.org/jira/browse/FLINK-13513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xu Yang updated FLINK-13513: Description: Define the abstract classes for the single-thread executor for Row type data. * Add the definition of abstract class Mapper * Add abstract class ModelMapper extends Mapper was: Define the abstract classes for the single-thread executor for Row type data. * Add the definition of abstract class FlatMapper * Add interface MapOpInterface for _the map operation of Row type data._ * Add abstract class FlatModelMapper, Mapper extends FlatMapper * Add abstract class ModelMapper extends FlatModelMapper > Add the Mapper and related classes for later algorithm implementations. > -- > > Key: FLINK-13513 > URL: https://issues.apache.org/jira/browse/FLINK-13513 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Xu Yang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Define the abstract classes for the single-thread executor for Row type data. > * Add the definition of abstract class Mapper > * Add abstract class ModelMapper extends Mapper -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r338360645 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && +fetch != null + +var supportPushDown = false +if (onlyLimit) { + supportPushDown = call.rel(1).asInstanceOf[TableScan] + .getTable.unwrap(classOf[TableSourceTable[_]]) match { +case table: TableSourceTable[_] => + table.tableSource match { +case source: LimitableTableSource[_] => !source.isLimitPushedDown +case _ => false + } +case _ => false + } +} +supportPushDown + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val sort = call.rel(0).asInstanceOf[Sort] +val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] +val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable] +val limit = RexLiteral.intValue(sort.fetch) +val relBuilder = call.builder() +val newRelOptTable = applyLimit(limit, relOptTable, relBuilder) +val newScan = scan.copy(scan.getTraitSet, newRelOptTable) Review comment: we should check whether the digest of new scan has been changed just like [FLINK-12399](https://github.com/apache/flink/pull/8468) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r338359693 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && +fetch != null + +var supportPushDown = false +if (onlyLimit) { + supportPushDown = call.rel(1).asInstanceOf[TableScan] + .getTable.unwrap(classOf[TableSourceTable[_]]) match { +case table: TableSourceTable[_] => + table.tableSource match { +case source: LimitableTableSource[_] => !source.isLimitPushedDown Review comment: we can't push limit down if the table source is a `FilterableTableSource` and `isFilterPushedDown` is true. Because if the table source is `ParquetTableSource` (which is a `FilterableTableSource`), and the `predicate` in `ParquetTableSource` can not filter records, just row-groups. Some `dirty` records maybe return from table source. It can't limit the `clean` records in such table source. Similarly, we should change the `match` in `PushFilterIntoTableSourceScanRule`, and can't push filter down if the table source is a `LimitableTableSource` and `isLimitPushedDown` is true. The logic of `PushLimitIntoTableSourceScanRule` and `PushFilterIntoTableSourceScanRule` are coupled together. It's better to find a clean way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r338361636 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.io.CollectionInputFormat +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.sources._ +import org.apache.flink.types.Row + +import scala.collection.JavaConverters._ + +/** + * The table source which support push-down the limit to the source. + */ +class TestLimitableTableSource( +data: Seq[Row], +rowType: RowTypeInfo, +var limit: Long = -1, +var limitablePushedDown: Boolean = false) + extends StreamTableSource[Row] + with LimitableTableSource[Row] { + + override def isBounded = true + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +if (limit == 0 && limit >= 0) { Review comment: `limit == 0 && limit >= 0` ?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r338362044 ## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml ## @@ -19,30 +19,56 @@ limitations under the License. + Review comment: remove this blank line ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r338355400 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + */ +@Experimental Review comment: why this interface is experimental ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
godfreyhe commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r338362335 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala ## @@ -90,4 +96,28 @@ class LimitTest extends TableTestBase { util.verifyPlan("SELECT a, c FROM MyTable OFFSET 10 ROWS") } + @Test + def testFetchWithLimitSource(): Unit = { Review comment: please add a test with `order by` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13513) Add the Mapper and related classes for later algorithm implementations.
[ https://issues.apache.org/jira/browse/FLINK-13513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xu Yang updated FLINK-13513: Summary: Add the Mapper and related classes for later algorithm implementations. (was: Add the FlatMapper and related classes for later algorithm implementations. ) > Add the Mapper and related classes for later algorithm implementations. > -- > > Key: FLINK-13513 > URL: https://issues.apache.org/jira/browse/FLINK-13513 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Xu Yang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Define the abstract classes for the single-thread executor for Row type data. > * Add the definition of abstract class FlatMapper > * Add interface MapOpInterface for _the map operation of Row type data._ > * Add abstract class FlatModelMapper, Mapper extends FlatMapper > * Add abstract class ModelMapper extends FlatModelMapper -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338177737 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -335,15 +336,51 @@ public void scheduleOnlyIfVertexIsCreated() throws Exception { } } + @Test + public void vertexIsNotAffectedByOutdatedDeployment() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(2); + + testExecutionSlotAllocator.disableAutoCompletePendingRequests(); + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final Iterator vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator(); + final ArchivedExecutionVertex v1 = vertexIterator.next(); + final ArchivedExecutionVertex v2 = vertexIterator.next(); + + final SchedulingExecutionVertex sv1 = scheduler.getSchedulingTopology().getVertices().iterator().next(); + + // fail v1 and let it recover to SCHEDULED + scheduler.updateTaskExecutionState(new TaskExecutionState( + jobGraph.getJobID(), + v1.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.FAILED)); + taskRestartExecutor.triggerScheduledTasks(); + + // fail v2 to finish all pending slot requests of the last round Review comment: In `EAGER` mode, v1 and v2 are scheduled in a batch. So that the pending slot request of v2 also blocks the deployment of v1. Here we fail v2 to complete(cancel) its pending slot request to trigger `deployOrHandleError` of v1's outdated deployment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9981: [FLINK-13195][sql-client] Add create table support for SqlClient
flinkbot edited a comment on issue #9981: [FLINK-13195][sql-client] Add create table support for SqlClient URL: https://github.com/apache/flink/pull/9981#issuecomment-545714180 ## CI report: * 52a10e0283f21047ac18d6668b71cfa4f6fc21a9 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133304303) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #9884: [FLINK-14266][table] Introduce RowCsvInputFormat to new CSV module
JingsongLi commented on issue #9884: [FLINK-14266][table] Introduce RowCsvInputFormat to new CSV module URL: https://github.com/apache/flink/pull/9884#issuecomment-545719375 > Thanks @JingsongLi for the PR. I added an initial set of comments. It would be great if we could further reduce the number of limitations. The CSV format is one of the most important batch connectors and should have a feature set similar to the (de)serialization schema. Otherwise we need to document a lot of limitations in descriptors and docs. Thanks @twalthr for your review. These limitations are compared with the previous CsvInputFormat in flink-java, not the RFC-(de)serialization schema in flink-csv. Some can continue to improve, some are more difficult (relying on Jackson). You are right, we need to document a lot of limitations in docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
flinkbot edited a comment on issue #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#issuecomment-542062879 ## CI report: * efdb2ad3942939590bbb0203ab04bae5fec634a9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131912243) * a30831a94681f1332648b34ce2978f883d4f33d5 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774 ## CI report: * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131080663) * ec84f32bdcae0738d97ac60090691789334a279a : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14514) Improve interface of FunctionContext#getJobParameter
Jark Wu created FLINK-14514: --- Summary: Improve interface of FunctionContext#getJobParameter Key: FLINK-14514 URL: https://issues.apache.org/jira/browse/FLINK-14514 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Jark Wu Currentlly, {{FunctionContext#getJobParameter}} gets the key-values from env's ExecutionConfig, but during expression reducing, it gets key-values from {{TableConfig#getConfiguration}} . As we are aiming to not expose underlying env which means users can't access ExecutionConfig in the future. So I propose to get from {{TableConfig#getConfiguration}} and make it clear in javadoc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338177737 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -335,15 +336,51 @@ public void scheduleOnlyIfVertexIsCreated() throws Exception { } } + @Test + public void vertexIsNotAffectedByOutdatedDeployment() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(2); + + testExecutionSlotAllocator.disableAutoCompletePendingRequests(); + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final Iterator vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator(); + final ArchivedExecutionVertex v1 = vertexIterator.next(); + final ArchivedExecutionVertex v2 = vertexIterator.next(); + + final SchedulingExecutionVertex sv1 = scheduler.getSchedulingTopology().getVertices().iterator().next(); + + // fail v1 and let it recover to SCHEDULED + scheduler.updateTaskExecutionState(new TaskExecutionState( + jobGraph.getJobID(), + v1.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.FAILED)); + taskRestartExecutor.triggerScheduledTasks(); + + // fail v2 to finish all pending slot requests of the last round Review comment: In `EAGER` mode, v1 and v2 are scheduled in a batch. So that the pending slot request of v2 also blocks the deployment of v1. Here we fail v2 to complete its slot request to trigger `deployOrHandleError` of v1's outdated deployment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338179104 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java ## @@ -56,6 +56,8 @@ @Nullable private final SlotSharingGroupId slotSharingGroupId; + private boolean isReleased = false; Review comment: Ok. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338172474 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -412,13 +412,17 @@ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwab }; } - private void stopDeployment(final DeploymentHandle deploymentHandle) { - cancelExecutionVertex(deploymentHandle.getExecutionVertexId()); + private void releaseUnassignedSlotIfPresent(final DeploymentHandle deploymentHandle) { // Canceling the vertex normally releases the slot. However, we might not have assigned // the slot to the vertex yet. + // Only release unassigned slot to guarantee no vertex state change happens here. deploymentHandle .getLogicalSlot() - .ifPresent(logicalSlot -> logicalSlot.releaseSlot(null)); + .ifPresent(logicalSlot -> { + if (logicalSlot.getPayload() != null) { Review comment: I made a mistake, the check should be `logicalSlot.getPayload() == null` instead. It's not problematic without this check at the moment. But the check helps to ensure that no failover will be triggered by this release action in case any faulty changes in the future. The possibility that a failover can be triggered here would make the vertex state hard to track. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338191388 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java ## @@ -111,10 +113,14 @@ public Payload getPayload() { @Override public CompletableFuture releaseSlot(@Nullable Throwable cause) { - slotOwner.returnLogicalSlot(this); + if (!isReleased) { Review comment: Not problematic currently. Actually I noticed this issue before `DefaultSchedulerTest` is refactored to use `TestExecutionSlotAllocator` instead of `SimpleSlotProvider`. When testing the new test(it was different from the current implementation since `SimpleSlotProvider` does not allow queued allocation), I saw a weird case that canceling a vertex makes 2 new slots become available. The cause is that an assigned slot can be released 2 times, each time a new slot will be returned to `SimpleSlotProvider` from `TestingLogicalSlot`. (that's also why I'd like to only release unassigned slot in 8ab26a8cbf11df7301995b783a59c739e2ad26d3). So I think it's still valid to fix it to avoid to encounter such weird cases in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment
zhuzhurk commented on a change in pull request #9902: [FLINK-14363][runtime] Prevent vertex from being affected by outdated deployment URL: https://github.com/apache/flink/pull/9902#discussion_r338176523 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -335,15 +336,51 @@ public void scheduleOnlyIfVertexIsCreated() throws Exception { } } + @Test + public void vertexIsNotAffectedByOutdatedDeployment() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(2); + + testExecutionSlotAllocator.disableAutoCompletePendingRequests(); + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final Iterator vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator(); + final ArchivedExecutionVertex v1 = vertexIterator.next(); + final ArchivedExecutionVertex v2 = vertexIterator.next(); + + final SchedulingExecutionVertex sv1 = scheduler.getSchedulingTopology().getVertices().iterator().next(); + + // fail v1 and let it recover to SCHEDULED + scheduler.updateTaskExecutionState(new TaskExecutionState( + jobGraph.getJobID(), + v1.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.FAILED)); + taskRestartExecutor.triggerScheduledTasks(); + + // fail v2 to finish all pending slot requests of the last round + // in order to trigger the outdated deployment of v1 + scheduler.updateTaskExecutionState(new TaskExecutionState( + jobGraph.getJobID(), + v2.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.FAILED)); + + // v1 should not be affected + assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED))); + } + private void waitForTermination(final DefaultScheduler scheduler) throws Exception { scheduler.getTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } private static JobGraph singleNonParallelJobVertexJobGraph() { + return singleNonParallelJobVertexJobGraph(1); + } + + private static JobGraph singleNonParallelJobVertexJobGraph(final int parallelism) { Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9981: [FLINK-13195][sql-client] Add create table support for SqlClient
flinkbot commented on issue #9981: [FLINK-13195][sql-client] Add create table support for SqlClient URL: https://github.com/apache/flink/pull/9981#issuecomment-545714180 ## CI report: * 52a10e0283f21047ac18d6668b71cfa4f6fc21a9 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14513) Implement listPartitionsByFilter to HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-14513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958469#comment-16958469 ] Terry Wang commented on FLINK-14513: Feel free to assign this task to me :) > Implement listPartitionsByFilter to HiveCatalog > --- > > Key: FLINK-14513 > URL: https://issues.apache.org/jira/browse/FLINK-14513 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.10.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14512) Introduce listPartitionsByFilter to Catalog
[ https://issues.apache.org/jira/browse/FLINK-14512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-14512: - Fix Version/s: 1.10.0 > Introduce listPartitionsByFilter to Catalog > --- > > Key: FLINK-14512 > URL: https://issues.apache.org/jira/browse/FLINK-14512 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.10.0 > > > planner will use catalog to do partition pruning. > So catalog need provide listPartitionsByFilter to partition pruning. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14513) Implement listPartitionsByFilter to HiveCatalog
Jingsong Lee created FLINK-14513: Summary: Implement listPartitionsByFilter to HiveCatalog Key: FLINK-14513 URL: https://issues.apache.org/jira/browse/FLINK-14513 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Jingsong Lee Fix For: 1.10.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14512) Introduce listPartitionsByFilter to Catalog
Jingsong Lee created FLINK-14512: Summary: Introduce listPartitionsByFilter to Catalog Key: FLINK-14512 URL: https://issues.apache.org/jira/browse/FLINK-14512 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Jingsong Lee planner will use catalog to do partition pruning. So catalog need provide listPartitionsByFilter to partition pruning. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14249) Rework table partition support
[ https://issues.apache.org/jira/browse/FLINK-14249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-14249: - Description: A partition is a division of a logical database or its constituent elements into distinct independent parts. Database partitioning is normally done for manageability, performance or availability reasons, or for load balancing. Partition is widely used in hive. Especially in the ETL domain, most tables have partition attributes, which allow users to continue processing. Partition is more convenient for data management, time partitioning and business partitioning are common. We have supported a basic version for Flink table partitioning at 1.9 release, and this JIRA aims to improve partitioning support. See [https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support] was: A partition is a division of a logical database or its constituent elements into distinct independent parts. Database partitioning is normally done for manageability, performance or availability reasons, or for load balancing. Partition is widely used in hive. Especially in the ETL domain, most tables have partition attributes, which allow users to continue processing. Partition is more convenient for data management, time partitioning and business partitioning are common. We have supported a basic version for Flink table partitioning at 1.9 release, and this JIRA aims to improve partitioning support. See [https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design] > Rework table partition support > -- > > Key: FLINK-14249 > URL: https://issues.apache.org/jira/browse/FLINK-14249 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.10.0 > > > A partition is a division of a logical database or its constituent elements > into distinct independent parts. Database partitioning is normally done for > manageability, performance or availability reasons, or for load balancing. > Partition is widely used in hive. Especially in the ETL domain, most tables > have partition attributes, which allow users to continue processing. > Partition is more convenient for data management, time partitioning and > business partitioning are common. > We have supported a basic version for Flink table partitioning at 1.9 > release, and this JIRA aims to improve partitioning support. > See > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958466#comment-16958466 ] Yang Wang commented on FLINK-14511: --- Hi [~Dillon.], thanks for this issue. I think we need more discussion here. As i know, there are two schedulers in Yarn. And they have difference policies about queue name. For fair scheduler, the queue is an absolute path like 'root.a.x'. And the leaf node could be same. That means we could have two queues 'root.a.x' and 'root.b.x'. So when we submit a flink application to fair scheduler, `-yqu` need to be an absolute path. For capacity scheduler, the leaf node is unique. And we could only use leaf node name to submit application. If you are using capacity scheduler, i think you should specify the `-yqu` to 'product' not 'root.product'. And use 'root.product' for fair scheduler. > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this queue is exist and we can still > submit application to YARN cluster, which is confusing for users. So I think > that when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } > if (!queueFound) { > String queueNames = ""; > for (QueueInfo queue : queues) { > queueNames += queue.getQueueName() + ", "; > } > LOG.warn("The specified queue '" + this.yarnQueue + "' does not > exist. " + > "Available queues: " + queueNames); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KurtYoung commented on issue #9966: [FLINK-14053] [blink-planner] DenseRankAggFunction.accumulateExpressions. it should be thinki…
KurtYoung commented on issue #9966: [FLINK-14053] [blink-planner] DenseRankAggFunction.accumulateExpressions. it should be thinki… URL: https://github.com/apache/flink/pull/9966#issuecomment-545711523 @liuyongvs There are some compilation errors, please fix it first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14499) MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
[ https://issues.apache.org/jira/browse/FLINK-14499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958461#comment-16958461 ] Zili Chen commented on FLINK-14499: --- Thanks for your attention [~AT-Fieldless]. I've assigned the issue to you. You can use an IDE like IDEA and list out the usage of `MetricRegistry#getMetricQueryServiceGatewayRpcAddress`(from/to its use point). For example some testing implements like {{HATestingDispatcher}} always receive a {{null}} {{metricQueryServiceAddress}} because it is not into use. However, it is possible you could figure out to replace it with a non-null dummy value. > MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull > > > Key: FLINK-14499 > URL: https://issues.apache.org/jira/browse/FLINK-14499 > Project: Flink > Issue Type: Improvement >Reporter: Zili Chen >Assignee: AT-Fieldless >Priority: Minor > > As codebase moved, I suspect > {{MetricRegistry#getMetricQueryServiceGatewayRpcAddress}} is now non-null. > One can try to figure it out and if so, refactor related code a bit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14499) MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
[ https://issues.apache.org/jira/browse/FLINK-14499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen reassigned FLINK-14499: - Assignee: AT-Fieldless > MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull > > > Key: FLINK-14499 > URL: https://issues.apache.org/jira/browse/FLINK-14499 > Project: Flink > Issue Type: Improvement >Reporter: Zili Chen >Assignee: AT-Fieldless >Priority: Minor > > As codebase moved, I suspect > {{MetricRegistry#getMetricQueryServiceGatewayRpcAddress}} is now non-null. > One can try to figure it out and if so, refactor related code a bit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] liuyongvs commented on issue #9966: [FLINK-14053] [blink-planner] DenseRankAggFunction.accumulateExpressions. it should be thinki…
liuyongvs commented on issue #9966: [FLINK-14053] [blink-planner] DenseRankAggFunction.accumulateExpressions. it should be thinki… URL: https://github.com/apache/flink/pull/9966#issuecomment-545707154 > Your base branch seems to be wrong, please use latest master branch and create a fix based on that. Hi @KurtYoung , I have already recolved the conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services