[jira] [Comment Edited] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200 ] Aitozi edited comment on FLINK-18027 at 4/12/23 5:43 AM: - The content is modified from [https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md] to [https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml] I think the content below is removed accidently. CC [~sjwiesman] {code:java} -- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code} I think we should add this back, since the explicit ROW constructor's limitation has been solved was (Author: aitozi): The content is modified from [https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md] to [https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml] I think the content below is removed accidently. {code:java} -- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code} I think we should add this back, since the explicit ROW constructor's limitation has been solved > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at >
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200 ] Aitozi commented on FLINK-18027: The content is modified from [https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md] to [https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml] I think the content below is removed accidently. {code:java} -- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code} I think we should add this back, since the explicit ROW constructor's limitation has been solved > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at
[GitHub] [flink] Aitozi commented on a diff in pull request #22378: [FLINK-31344][planner] Support to update nested columns in update sta…
Aitozi commented on code in PR #22378: URL: https://github.com/apache/flink/pull/22378#discussion_r1162597631 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml: ## @@ -20,17 +20,17 @@ limitations under the License.
[GitHub] [flink] Aitozi commented on a diff in pull request #22378: [FLINK-31344][planner] Support to update nested columns in update sta…
Aitozi commented on code in PR #22378: URL: https://github.com/apache/flink/pull/22378#discussion_r1162702918 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml: ## @@ -20,17 +20,17 @@ limitations under the License.
[GitHub] [flink-ml] lindong28 opened a new pull request, #232: [hotfix] Update pattern matching in test_ml_lib_completeness to work with release branch
lindong28 opened a new pull request, #232: URL: https://github.com/apache/flink-ml/pull/232 ## What is the purpose of the change Update pattern matching in test_ml_lib_completeness to work with release branch ## Brief change log Updated test_ml_lib_completeness. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
[ https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711185#comment-17711185 ] Weihua Hu commented on FLINK-31775: --- Thanks [~sergiosp] report this. IMHO, this is the same issue as FLINK-28171. After FLINK-28171 is fixed, you can define the appProtocol in podTemplate both for kubernetes-operator and native kubernetes. > High-Availability not supported in kubernetes when istio enabled > > > Key: FLINK-31775 > URL: https://issues.apache.org/jira/browse/FLINK-31775 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.16.1 >Reporter: Sergio Sainz >Priority: Major > > When using native kubernetes deployment mode with high-availability (HA), and > when new TaskManager pod is started to process a job, the TaskManager pod > will attempt to register itself to the resource manager (JobManager). the > TaskManager looks up the resource manager per ip-address > (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) > > Nevertheless when istio is enabled, the resolution by ip address is blocked, > and hence we see that the job cannot start because task manager cannot > register with the resource manager: > 2023-04-10 23:24:19,752 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not > resolve ResourceManager address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. > > Notice that when HA is disabled, the resolution of the resource manager is > made by service name and so the resource manager can be found > > 2023-04-11 00:49:34,162 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful > registration at resource manager > akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* > under registration id 83ad942597f86aa880ee96f1c2b8b923. > > Notice in my case , it is not possible to disable istio as explained here: > [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] > > Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , > logging as separate defect as I believe the fix of FLINK-28171 won't fix this > case. FLINK-28171 is about Flink Kubernetes Operator and this is about > native kubernetes deployment. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() { @Override public final void cancel() throws Exception { -isRunning = false; -canceled = true; +taskState.canceled = true; Review Comment: Hey @rkhachatryan , thanks for taking another look! Just wanted to maintain the existing logic where both flags (running and canceled are true) but I agree having to maintain both enum and flags is confusing -- thinking it over looks like we only need to protect from invalid transitions in general and more specifically here from: CANCELED to RUNNING as added -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() { @Override public final void cancel() throws Exception { -isRunning = false; -canceled = true; +taskState.canceled = true; Review Comment: Hey @rkhachatryan , thanks for taking another look! Just wanted to maintain the existing logic where both flags (running and canceled are true) but I agree having both enum and states is confusing -- thinking it over looks like we only need to protect from invalid transitions in general and more specifically here from: CANCELED to RUNNING as added -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() { @Override public final void cancel() throws Exception { -isRunning = false; -canceled = true; +taskState.canceled = true; Review Comment: Hey @rkhachatryan , thanks for taking another look! Just wanted to maintain the existing logic where both flags (running and canceled are true) but I agree having both enum and states is confusing -- thinking it over looks like we only need to protect from the invalid transition: Canceled to Running as we recently added -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() { @Override public final void cancel() throws Exception { -isRunning = false; -canceled = true; +taskState.canceled = true; Review Comment: Hey @rkhachatryan , thanks for taking another look! Just wanted to maintain the existing logic where both flags (running and canceled are true) but thinking it over looks like we only need to protect from the invalid transition: Canceled to Running -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() { @Override public final void cancel() throws Exception { -isRunning = false; -canceled = true; +taskState.canceled = true; Review Comment: Hey @rkhachatryan , thanks for taking another look! Just wanted to maintain the existing logic where both flags (running and canceled are true) but thinking it over looks like we only need to protect from the invalid transition: Canceled to Running (already enforced) -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew
[ https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liam updated FLINK-31762: - Description: To simplify the demonstration, let us assume that there are two topics, and each topic has four partitions. We have set the parallelism to eight to consume these two topics. However, the current partition assignment method may lead to some subtasks being assigned two partitions while others are left with none. !image-2023-04-11-08-00-16-054.png|width=500,height=143! In my case, the situation is even worse as I have ten topics, each with 100 partitions. If I set the parallelism to 1000, some slots may be assigned seven partitions while others remain unassigned. To address this issue, I propose a new partition assignment solution. In this approach, round-robin assignment takes place between all topics, not just one. For example, the ideal assignment for the case mentioned above is presented below: !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134! This new solution can also handle cases where each topic has more partitions. !image-2023-04-11-08-12-24-115.png|width=444,height=127! Let us work together to reach a consensus on this proposal. Thank you! FYI: how the partition be assigned currently {code:java} public class KafkaTopicPartitionAssigner { public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { return assign(partition.getTopic(), partition.getPartition(), numParallelSubtasks); } public static int assign(String topic, int partition, int numParallelSubtasks) { int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the // start index return (startIndex + partition) % numParallelSubtasks; } {code} for Kafka Source, it's implemented in the KafkaSourceEnumerator as below {code:java} static int getSplitOwner(TopicPartition tp, int numReaders) { int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) % numReaders; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the // start index return (startIndex + tp.partition()) % numReaders; } {code} was: To simplify the demonstration, let us assume that there are two topics, and each topic has four partitions. We have set the parallelism to eight to consume these two topics. However, the current partition assignment method may lead to some subtasks being assigned two partitions while others are left with none. !image-2023-04-11-08-00-16-054.png|width=500,height=143! In my case, the situation is even worse as I have ten topics, each with 100 partitions. If I set the parallelism to 1000, some slots may be assigned seven partitions while others remain unassigned. To address this issue, I propose a new partition assignment solution. In this approach, round-robin assignment takes place between all topics, not just one. For example, the ideal assignment for the case mentioned above is presented below: !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134! This new solution can also handle cases where each topic has more partitions. !image-2023-04-11-08-12-24-115.png|width=444,height=127! Let us work together to reach a consensus on this proposal. Thank you! FYI: how the partition be assigned currently {code:java} public class KafkaTopicPartitionAssigner { public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { return assign(partition.getTopic(), partition.getPartition(), numParallelSubtasks); } public static int assign(String topic, int partition, int numParallelSubtasks) { int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the // start index return (startIndex + partition) % numParallelSubtasks; } {code} > Subscribe to multiple Kafka topics may cause partition assignment skew > -- > > Key: FLINK-31762 > URL: https://issues.apache.org/jira/browse/FLINK-31762 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.18.0 >Reporter: Liam >Priority: Major > Attachments: image-2023-04-11-08-00-16-054.png, >
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer
jiangxin369 commented on code in PR #229: URL: https://github.com/apache/flink-ml/pull/229#discussion_r1161619708 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java: ## @@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig( } } +StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl); +for (int i = 0; i < inputs.length; ++i) { +if (inputs[i] instanceof NetworkInputConfig) { +TypeSerializer typeSerializerIn = +((NetworkInputConfig) inputs[i]).getTypeSerializer(); +if (typeSerializerIn instanceof IterationRecordSerializer) { Review Comment: Yes, you're right. I removed the `if ... instanceOf ...` check. And seems that we did not modify the `StateKeySerializer` at all, so I also removed this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on pull request #231: [FLINK-31374] ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
jiangxin369 commented on PR #231: URL: https://github.com/apache/flink-ml/pull/231#issuecomment-1504431951 @zhipeng93 Thanks for the review, I've updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement
[ https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711166#comment-17711166 ] lincoln lee commented on FLINK-31301: - [~aitozi] thanks for fixing this! I'll try to find some time to take a look. > Unsupported nested columns in column list of insert statement > - > > Key: FLINK-31301 > URL: https://issues.apache.org/jira/browse/FLINK-31301 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1 >Reporter: lincoln lee >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > Currently an error will be raised when use nested columns in column list of > insert statement, e.g., > {code:java} > INSERT INTO nested_type_sink (a,b.b1,c.c2,f) > SELECT a,b.b1,c.c2,f FROM nested_type_src > {code} > > {code:java} > java.lang.AssertionError > at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333) > at > org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711161#comment-17711161 ] luoyuxia edited comment on FLINK-31533 at 4/12/23 1:27 AM: --- [~aitozi] Thanks for voluntering, but please remember with the syntax that proposed in [FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185]] {code:java} CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ] {code} , we can't define partition. We need to extend the syntax which may need a FLIP. Also, please remember to consider FLINK-31534 while extending the syntax. was (Author: luoyuxia): [~aitozi] Thanks for voluntering, but please remember with the syntax that proposed in [FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185],] {code:java} CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ] {code} , we can't define partition. We need to extend the syntax which may need a FLIP. Also, please remember to consider FLINK-31534 while extending the syntax. > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711161#comment-17711161 ] luoyuxia edited comment on FLINK-31533 at 4/12/23 1:27 AM: --- [~aitozi] Thanks for voluntering, but please remember with the syntax that proposed in [FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185]] {code:java} CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ] {code} , we can't define partition. We need to extend the syntax which may need a FLIP. Also, please remember to consider FLINK-31534 while extending the syntax. was (Author: luoyuxia): [~aitozi] Thanks for voluntering, but please remember with the syntax that proposed in [FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185]] {code:java} CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ] {code} , we can't define partition. We need to extend the syntax which may need a FLIP. Also, please remember to consider FLINK-31534 while extending the syntax. > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711161#comment-17711161 ] luoyuxia commented on FLINK-31533: -- [~aitozi] Thanks for voluntering, but please remember with the syntax that proposed in [FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185],] {code:java} CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ] {code} , we can't define partition. We need to extend the syntax which may need a FLIP. Also, please remember to consider FLINK-31534 while extending the syntax. > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia reassigned FLINK-31774: Assignee: Aitozi > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711159#comment-17711159 ] luoyuxia commented on FLINK-31774: -- [~aitozi] Thanks for raising it and voluntering. I miss to add these statements since current no connector had integrated the delete & update api . But I think it worths a document since Flink itself supports such synatax. Assign to you now, please ping me to review after the pr is ready. > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Priority: Major > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163455375 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.datastream.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An operator that implements the co-group logic. + * + * @param The class type of the first input. + * @param The class type of the second input. + * @param The class type of the key. + * @param The class type of the output values. + */ +public class CoGroupOperator +extends AbstractUdfStreamOperator> +implements TwoInputStreamOperator, BoundedMultiInput { + +private PushSorter>> sorterA; +private PushSorter>> sorterB; +private TypeComparator>> comparatorA; +private TypeComparator>> comparatorB; +private KeySelector keySelectorA; +private KeySelector keySelectorB; +private TypeSerializer>> keyAndValueSerializerA; +private TypeSerializer>> keyAndValueSerializerB; +private TypeSerializer keySerializer; +private DataOutputSerializer dataOutputSerializer; +private long lastWatermarkTimestamp = Long.MIN_VALUE; +private int remainingInputNum = 2; + +public CoGroupOperator(CoGroupFunction function) { +super(function); +} + +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader(); +MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager(); +IOManager ioManager = containingTask.getEnvironment().getIOManager(); + +keySelectorA = config.getStatePartitioner(0, userCodeClassLoader); +keySelectorB = config.getStatePartitioner(1, userCodeClassLoader); +keySerializer = config.getStateKeySerializer(userCodeClassLoader); +int keyLength = keySerializer.getLength(); + +TypeSerializer typeSerializerA = config.getTypeSerializerIn(0,
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453972 ## flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java: ## @@ -59,6 +64,54 @@ public void testMapPartition() throws Exception { new int[] {5, 5, 5, 5}, counts.stream().mapToInt(Integer::intValue).toArray()); } +@Test +public void testCoGroup() throws Exception { +DataStream> data1 = +env.fromCollection( +Arrays.asList(Tuple2.of(1, 1), Tuple2.of(2, 2), Tuple2.of(3, 3))); +DataStream> data2 = +env.fromCollection( +Arrays.asList( +Tuple2.of(1, 1.5), +Tuple2.of(5, 5.5), +Tuple2.of(3, 3.5), +Tuple2.of(1, 2.5))); +DataStream result = +DataStreamUtils.coGroup( +data1, +data2, +(KeySelector, Integer>) tuple -> tuple.f0, +(KeySelector, Integer>) tuple -> tuple.f0, +BasicTypeInfo.DOUBLE_TYPE_INFO, +new CoGroupFunction< +Tuple2, Tuple2, Double>() { +@Override +public void coGroup( +Iterable> iterableA, +Iterable> iterableB, +Collector collector) { +List> valuesA = + IteratorUtils.toList(iterableA.iterator()); +List> valuesB = + IteratorUtils.toList(iterableB.iterator()); + +double sum = 0; +for (Tuple2 value : valuesA) { +sum += value.f1; +} +for (Tuple2 value : valuesB) { +sum += value.f1; +} +collector.collect(sum); +} +}); + +List resultValues = IteratorUtils.toList(result.executeAndCollect()); +double[] resultPrimitiveValues = + resultValues.stream().mapToDouble(Double::doubleValue).toArray(); +assertArrayEquals(new double[] {5.0, 2.0, 6.5, 5.5}, resultPrimitiveValues, 1e-5); Review Comment: Yes, I think this assert should always work. Can you explain why the result of this particular co-group does not preserve order? Note that `StreamExecutionEnvironment#fromCollection` returns a DataStream with parallelism=1. And therefore the co-group operator is executed with parallelism=1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453972 ## flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java: ## @@ -59,6 +64,54 @@ public void testMapPartition() throws Exception { new int[] {5, 5, 5, 5}, counts.stream().mapToInt(Integer::intValue).toArray()); } +@Test +public void testCoGroup() throws Exception { +DataStream> data1 = +env.fromCollection( +Arrays.asList(Tuple2.of(1, 1), Tuple2.of(2, 2), Tuple2.of(3, 3))); +DataStream> data2 = +env.fromCollection( +Arrays.asList( +Tuple2.of(1, 1.5), +Tuple2.of(5, 5.5), +Tuple2.of(3, 3.5), +Tuple2.of(1, 2.5))); +DataStream result = +DataStreamUtils.coGroup( +data1, +data2, +(KeySelector, Integer>) tuple -> tuple.f0, +(KeySelector, Integer>) tuple -> tuple.f0, +BasicTypeInfo.DOUBLE_TYPE_INFO, +new CoGroupFunction< +Tuple2, Tuple2, Double>() { +@Override +public void coGroup( +Iterable> iterableA, +Iterable> iterableB, +Collector collector) { +List> valuesA = + IteratorUtils.toList(iterableA.iterator()); +List> valuesB = + IteratorUtils.toList(iterableB.iterator()); + +double sum = 0; +for (Tuple2 value : valuesA) { +sum += value.f1; +} +for (Tuple2 value : valuesB) { +sum += value.f1; +} +collector.collect(sum); +} +}); + +List resultValues = IteratorUtils.toList(result.executeAndCollect()); +double[] resultPrimitiveValues = + resultValues.stream().mapToDouble(Double::doubleValue).toArray(); +assertArrayEquals(new double[] {5.0, 2.0, 6.5, 5.5}, resultPrimitiveValues, 1e-5); Review Comment: Can you explain why the result of this particular co-group does not preserve order? Note that `StreamExecutionEnvironment#fromCollection` returns a DataStream with parallelism=1. And therefore the co-group operator is executed with parallelism=1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453277 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.datastream.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An operator that implements the co-group logic. + * + * @param The class type of the first input. + * @param The class type of the second input. + * @param The class type of the key. + * @param The class type of the output values. + */ +public class CoGroupOperator +extends AbstractUdfStreamOperator> +implements TwoInputStreamOperator, BoundedMultiInput { + +private PushSorter>> sorterA; Review Comment: The main reason is to reduce the amount of work in this PR that will be discarded in the future. In order to change the type from `StreamRecord` to `IN1`, we would need to update the classes `BytesKeyNormalizationUtil`, `FixedLengthByteKeyComparator`, `KeyAndValueSerializer` and `VariableLengthByteKeyComparator` so that they don't rely on `StreamRecord`. The above four classes are copied and pasted from apache/flink. And we will likely remove them from Flink ML in the future after we optimize the cogroup API in apache/flink. And I am not sure the amount of work described above will be useful later. Given that the overhead of DataStream is roughly 12.3% more than than DataSet, and the performance difference will likely be negligible for non-trivial algorithms such as ALS, I find it reasonable to just use the StreamRecord here as the short term approach and spend time on the long term approach (i.e. optimizing apache/flink). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163450592 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.datastream.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An operator that implements the co-group logic. + * + * @param The class type of the first input. + * @param The class type of the second input. + * @param The class type of the key. + * @param The class type of the output values. + */ +public class CoGroupOperator +extends AbstractUdfStreamOperator> +implements TwoInputStreamOperator, BoundedMultiInput { + +private PushSorter>> sorterA; +private PushSorter>> sorterB; +private TypeComparator>> comparatorA; +private TypeComparator>> comparatorB; +private KeySelector keySelectorA; +private KeySelector keySelectorB; +private TypeSerializer>> keyAndValueSerializerA; +private TypeSerializer>> keyAndValueSerializerB; +private TypeSerializer keySerializer; +private DataOutputSerializer dataOutputSerializer; +private long lastWatermarkTimestamp = Long.MIN_VALUE; +private int remainingInputNum = 2; + +public CoGroupOperator(CoGroupFunction function) { +super(function); +} + +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader(); +MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager(); +IOManager ioManager = containingTask.getEnvironment().getIOManager(); + +keySelectorA = config.getStatePartitioner(0, userCodeClassLoader); +keySelectorB = config.getStatePartitioner(1, userCodeClassLoader); +keySerializer = config.getStateKeySerializer(userCodeClassLoader); +int keyLength = keySerializer.getLength(); + +TypeSerializer typeSerializerA = config.getTypeSerializerIn(0,
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163449957 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.datastream.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An operator that implements the co-group logic. + * + * @param The class type of the first input. + * @param The class type of the second input. + * @param The class type of the key. + * @param The class type of the output values. + */ +public class CoGroupOperator +extends AbstractUdfStreamOperator> +implements TwoInputStreamOperator, BoundedMultiInput { + +private PushSorter>> sorterA; +private PushSorter>> sorterB; +private TypeComparator>> comparatorA; +private TypeComparator>> comparatorB; +private KeySelector keySelectorA; +private KeySelector keySelectorB; +private TypeSerializer>> keyAndValueSerializerA; +private TypeSerializer>> keyAndValueSerializerB; +private TypeSerializer keySerializer; +private DataOutputSerializer dataOutputSerializer; +private long lastWatermarkTimestamp = Long.MIN_VALUE; +private int remainingInputNum = 2; + +public CoGroupOperator(CoGroupFunction function) { +super(function); +} + +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader(); +MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager(); +IOManager ioManager = containingTask.getEnvironment().getIOManager(); + +keySelectorA = config.getStatePartitioner(0, userCodeClassLoader); +keySelectorB = config.getStatePartitioner(1, userCodeClassLoader); +keySerializer = config.getStateKeySerializer(userCodeClassLoader); +int keyLength = keySerializer.getLength(); + +TypeSerializer typeSerializerA = config.getTypeSerializerIn(0,
[GitHub] [flink-connector-kafka] tzulitai commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1504292430 Merged via https://github.com/apache/flink-connector-kafka/commit/a7785630e714af303b224c38d9a6caa89a551265 I squashed my changes along with your comment. Please let me know if you find anything odd and would like me to address. Thanks again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai closed pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai closed pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink URL: https://github.com/apache/flink-connector-kafka/pull/18 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1504282565 @AlexAxeman thanks a lot for your contribution! +1 LGTM. Since there's other work that is sort of depending on this feature, I'll proceed to merge this PR and address my own comments while merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-28171) Adjust Job and Task manager port definitions to work with Istio+mTLS
[ https://issues.apache.org/jira/browse/FLINK-28171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710375#comment-17710375 ] Sergio Sainz edited comment on FLINK-28171 at 4/11/23 11:46 PM: Hello [~wangyang0918] ! also experiencing this issue with native kubernetes deployment with HA enabled (not in flink k8s operator). In [https://lists.apache.org/thread/yl40s9069wksz66qlf9t6jhmwsn59zft] you mentioned that "If HA enabled, the internal jobmanager rpc service will not be created. Instead, the TaskManager retrieves the JobManager address via HA services and connects to it via pod ip." Do you know whether we can change the way TaskManager connects to HA services: do not use ip address and instead use pod name? I think we cannot add the akka workaround of bypassing the istio sidecar. Alternatively, do you know how to add TLS encryption to this channel (TaskManager->HA service) manually? Thanks for the info ~ was (Author: sergiosp): Hello [~wangyang0918] ! also experiencing this issue with native kubernetes deployment with HA enabled (not in flink k8s operator). In [https://lists.apache.org/thread/yl40s9069wksz66qlf9t6jhmwsn59zft] you mentioned that "If HA enabled, the internal jobmanager rpc service will not be created. Instead, the TaskManager retrieves the JobManager address via HA services and connects to it via pod ip." Do you know whether we can change the way TaskManager connects to HA services: do not use ip address and instead use pod name? I think we cannot add the akka workaround of bypassing the istio sidecar. Thanks for the info ~ > Adjust Job and Task manager port definitions to work with Istio+mTLS > > > Key: FLINK-28171 > URL: https://issues.apache.org/jira/browse/FLINK-28171 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.14.4 > Environment: flink-kubernetes-operator 1.0.0 > Flink 1.14-java11 > Kubernetes v1.19.5 > Istio 1.7.6 >Reporter: Moshe Elisha >Assignee: Moshe Elisha >Priority: Major > Labels: pull-request-available > > Hello, > > We are launching Flink deployments using the [Flink Kubernetes > Operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/] > on a Kubernetes cluster with Istio and mTLS enabled. > > We found that the TaskManager is unable to communicate with the JobManager on > the jobmanager-rpc port: > > {{2022-06-15 15:25:40,508 WARN akka.remote.ReliableDeliverySupervisor > [] - Association with remote system > [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123] > has failed, address is now gated for [50] ms. Reason: [Association failed > with > [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]] > Caused by: [The remote system explicitly disassociated (reason unknown).]}} > > The reason for the issue is that the JobManager service port definitions are > not following the Istio guidelines > [https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/] > (see example below). > > There was also an email discussion around this topic in the users mailing > group under the subject "Flink Kubernetes Operator with K8S + Istio + mTLS - > port definitions". > With the help of the community, we were able to work around the issue but it > was very hard and forced us to skip Istio proxy which is not ideal. > > We would like you to consider changing the default port definitions, either > # Rename the ports – I understand it is Istio specific guideline but maybe > it is better to at least be aligned with one (popular) vendor guideline > instead of none at all. > # Add the “appProtocol” property[1] that is not specific to any vendor but > requires Kubernetes >= 1.19 where it was introduced as beta and moved to > stable in >= 1.20. The option to add appProtocol property was added only in > [https://github.com/fabric8io/kubernetes-client/releases/tag/v5.10.0] with > [#3570|https://github.com/fabric8io/kubernetes-client/issues/3570]. > # Or allow a way to override the defaults. > > [https://kubernetes.io/docs/concepts/services-networking/_print/#application-protocol] > > > {{# k get service inference-results-to-analytics-engine -o yaml}} > {{apiVersion: v1}} > {{kind: Service}} > {{...}} > {{spec:}} > {{ clusterIP: None}} > {{ ports:}} > {{ - name: jobmanager-rpc *# should start with “tcp-“ or add "appProtocol" > property*}} > {{ port: 6123}} > {{ protocol: TCP}} > {{ targetPort: 6123}} > {{ - name: blobserver *# should start with
[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163427396 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java: ## @@ -86,6 +89,16 @@ public void testDoNotAllowMultipleTopicSelector() { .isInstanceOf(IllegalStateException.class); } +@Test +public void testDoNotAllowMultipleHeaderProducers() { Review Comment: I don't think you really need this restriction for the headers. The key / value serializer setters had the check because there were different entry points for setting the serialization schema, i.e. either via `setKafkaXSerializer` or `setKeySerializationSchema`. We don't want the user to be setting the key serializer through both entrypoints, hence the check. This isn't a problem for the header provider setting. Same goes for the Kafka partitioner, for example - we don't check multiple sets there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
rkhachatryan commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1163352135 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() { @Override public final void cancel() throws Exception { -isRunning = false; -canceled = true; +taskState.canceled = true; Review Comment: Shouldn't we also update `taskState.status` here? (ideally, in my view we should use either enum or a set of flags inside the `TaskState`, but not both) ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -1797,4 +1779,74 @@ public interface CanEmitBatchOfRecordsChecker { boolean check(); } + +/** Possible states of a Task. */ +private static class TaskState { +/** + * An enumeration of all states that a task can be in during its execution. Transitions + * usually follow the diagram bellow: + * + * {@code + * INITIALIZED -> RESTORING -> RUNNING -> FINISHED + * | | | + * | | | + * | V | + * | CANCELED -+ Review Comment: `CANCELED` state was removed from the code. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -1797,4 +1779,74 @@ public interface CanEmitBatchOfRecordsChecker { boolean check(); } + +/** Possible states of a Task. */ +private static class TaskState { +/** + * An enumeration of all states that a task can be in during its execution. Transitions + * usually follow the diagram bellow: + * + * {@code + * INITIALIZED -> RESTORING -> RUNNING -> FINISHED + * | | | + * | | | + * | V | + * | CANCELED -+ + * || + * ++ + * + * } + * + * Task enters {@code RESTORING} status before {@code RUNNING} to restore an invokable + * object from the last valid state, if any. + * + * Task enters {@code FINISHED} status through cleanup method, regardless of + * cancellations or if the previous call succeeded. + * + * It is possible for a Task to be {@code failing} while being in any status e.g, failing + * == true while status is RUNNING + */ +private enum Status { +/** Task has successfully terminated. */ +FINISHED(), +/** The task is "in operation". */ +RUNNING(FINISHED), +/** The task is restoring during {@link #restore()}. */ +RESTORING(RUNNING, FINISHED), +/** Task constructor was called on init state. */ +INITIALIZED(RESTORING); + +Status[] transitions; + +Status(Status... transitions) { +this.transitions = transitions; +} +} + +/** + * Task is failing e.g., if an exception occurred inside {@link #invoke()}. Note that this + * can happen while Task is still in Running status. + */ +private volatile boolean failing = false; + +/** + * Task has been canceled. Note that can value can be true even while Task is still in + * Running status, e.g., canceled externally by TaskCanceler. + */ +private volatile boolean canceled = false; + +private volatile Status status = Status.INITIALIZED; + +final boolean transitionTo(Status newStatus) { +return Arrays.stream(status.transitions).anyMatch(newStatus::equals); +} Review Comment: According to this method name and usages, should it actually update the state? (rather than just returning boolean) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21833: [FLINK-30852][runtime] Checking task cancelation explicitly rather than failing the cancel method
rkhachatryan commented on code in PR #21833: URL: https://github.com/apache/flink/pull/21833#discussion_r1163338534 ## flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java: ## @@ -1300,9 +1304,7 @@ public TestInvokableCorrect(Environment environment) { public void invoke() {} @Override -public void cancel() { -fail("This should not be called"); Review Comment: nit: although this won't cause test failure, I think logging (unexpected) cancellation makes debugging and reading the code a bit easier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #563: [FLINK-31716] Event UID field is missing the first time that an event…
morhidi commented on PR #563: URL: https://github.com/apache/flink-kubernetes-operator/pull/563#issuecomment-1504075103 +1 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22377: [FLINK-31765][runtime][test] Disable changelog backend for RocksDB migration tests
rkhachatryan commented on code in PR #22377: URL: https://github.com/apache/flink/pull/22377#discussion_r1163316537 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java: ## @@ -136,6 +136,9 @@ public void testSavepoint() throws Exception { switch (snapshotSpec.getStateBackendType()) { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: env.setStateBackend(new EmbeddedRocksDBStateBackend()); +// disable changelog backend for now to ensure determinism in test data generation +// (see FLINK-31766) +env.enableChangelogStateBackend(false); Review Comment: nit: this change duplicates line 155, one can be removed nit: can be scoped to `executionMode == ExecutionMode.CREATE_SNAPSHOT` only -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #563: [FLINK-31716] Event UID field is missing the first time that an event…
morhidi commented on code in PR #563: URL: https://github.com/apache/flink-kubernetes-operator/pull/563#discussion_r1163279331 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java: ## @@ -72,6 +72,7 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertEquals(event.getMetadata().getUid(), eventConsumed.getMetadata().getUid()); Review Comment: This is redundant with the assertions below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on pull request #18: [hotfix] Use 1.0.0-1.16 baseline for API compatibility checks
reta commented on PR #18: URL: https://github.com/apache/flink-connector-opensearch/pull/18#issuecomment-1504021457 @MartijnVisser if you have time, super small one, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] rodmeneses opened a new pull request, #563: [FLINK-31716] Event UID field is missing the first time that an event…
rodmeneses opened a new pull request, #563: URL: https://github.com/apache/flink-kubernetes-operator/pull/563 What is the purpose of the change Fixes a bug reported on https://issues.apache.org/jira/browse/FLINK-31716 where the event being consumed for the first time didn't have an UID field This ticket was reopened for two reasons: 1. Before the check was only being done when the event was created for the first time, and not on updates 2. We are now accepting the actual return value of the `.createOrReplace` call. Before we were only using the `uid` field of it. Brief change log Fixes a bug on EventUtils where the event consumed was missing the UID field Verifying this change This change modified existing unit tests. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): no The public API, i.e., is any changes to the CustomResourceDescriptors: no Core observer or reconciler logic that is regularly executed: no Documentation Does this pull request introduce a new feature? no If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters
[ https://issues.apache.org/jira/browse/FLINK-31743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711097#comment-17711097 ] Roman Khachatryan commented on FLINK-31743: --- [~assassinj] are you still planning to work on this issue? > Avoid relocating the RocksDB's log failure when filename exceeds 255 > characters > --- > > Key: FLINK-31743 > URL: https://issues.apache.org/jira/browse/FLINK-31743 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.16.1, 1.15.4 >Reporter: jinghaihang >Assignee: jinghaihang >Priority: Major > Fix For: 1.16.2, 1.18.0, 1.17.1 > > > Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing > the db path, when the db path is long and the filename exceeds 255 > characters, the creation of the file will fail, so the relevant rocksdb LOG > cannot be seen in the flink log dir. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
[ https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergio Sainz updated FLINK-31775: - Description: When using native kubernetes deployment mode with high-availability (HA), and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator and this is about native kubernetes deployment. was: When using native kubernetes deployment mode with high-availability, and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator and this is about native kubernetes deployment. > High-Availability not supported in kubernetes when istio enabled > > > Key: FLINK-31775 > URL: https://issues.apache.org/jira/browse/FLINK-31775 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.16.1 >Reporter: Sergio Sainz >Priority: Major > > When using native kubernetes deployment mode with high-availability (HA), and > when new TaskManager pod is started to process a job, the TaskManager pod > will attempt to register itself to the resource manager (JobManager). the > TaskManager looks up the resource manager per ip-address > (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) > > Nevertheless when istio is enabled, the resolution by ip address is blocked, > and hence we see that the job cannot start because task manager cannot > register with the resource manager: > 2023-04-10 23:24:19,752 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not > resolve ResourceManager address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address >
[jira] [Reopened] (FLINK-31716) Event UID field is missing the first time that an event is consumed
[ https://issues.apache.org/jira/browse/FLINK-31716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi reopened FLINK-31716: > Event UID field is missing the first time that an event is consumed > --- > > Key: FLINK-31716 > URL: https://issues.apache.org/jira/browse/FLINK-31716 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Rodrigo Meneses >Assignee: Rodrigo Meneses >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.5.0 > > > on `EventUtils.createOrUpdateEvent` we use a `Consumer` instance to > `accept` the underlying event that is being created or updated. > The first time an event is created, we are calling > `client.resource(event).createOrReplace()` but we are discarding the return > value of such method, and we are returning the `event` that we just created, > which has an empty UID field. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
[ https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergio Sainz updated FLINK-31775: - Description: When using native kubernetes deployment mode with high-availability, and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator and this is about native kubernetes deployment. was: When using native kubernetes deployment mode, and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator and this is about native kubernetes deployment. > High-Availability not supported in kubernetes when istio enabled > > > Key: FLINK-31775 > URL: https://issues.apache.org/jira/browse/FLINK-31775 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.16.1 >Reporter: Sergio Sainz >Priority: Major > > When using native kubernetes deployment mode with high-availability, and when > new TaskManager pod is started to process a job, the TaskManager pod will > attempt to register itself to the resource manager (JobManager). the > TaskManager looks up the resource manager per ip-address > (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) > > Nevertheless when istio is enabled, the resolution by ip address is blocked, > and hence we see that the job cannot start because task manager cannot > register with the resource manager: > 2023-04-10 23:24:19,752 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not > resolve ResourceManager address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address >
[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
[ https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergio Sainz updated FLINK-31775: - Description: When using native kubernetes deployment mode, and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator and this is about native kubernetes deployment. was: When using native kubernetes deployment mode, and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator. > High-Availability not supported in kubernetes when istio enabled > > > Key: FLINK-31775 > URL: https://issues.apache.org/jira/browse/FLINK-31775 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.16.1 >Reporter: Sergio Sainz >Priority: Major > > When using native kubernetes deployment mode, and when new TaskManager pod is > started to process a job, the TaskManager pod will attempt to register itself > to the resource manager (JobManager). the TaskManager looks up the resource > manager per ip-address > (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) > > Nevertheless when istio is enabled, the resolution by ip address is blocked, > and hence we see that the job cannot start because task manager cannot > register with the resource manager: > 2023-04-10 23:24:19,752 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not > resolve ResourceManager address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. > > Notice that when HA is disabled, the resolution of the resource manager
[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
[ https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergio Sainz updated FLINK-31775: - Description: When using native kubernetes deployment mode, and when new TaskManager pod is started to process a job, the TaskManager pod will attempt to register itself to the resource manager (JobManager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice in my case , it is not possible to disable istio as explained here: [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html] Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator. was: When using native kubernetes deployment mode, and when new TaskManager is started to process a job, the TaskManager will attempt to register itself to the resource manager (job manager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://fl...@local-mci-ar32a-dev-flink-cluster.mstr-env-mci-ar32a-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice it is not possible to disable istio (as explained here : https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html) Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator. > High-Availability not supported in kubernetes when istio enabled > > > Key: FLINK-31775 > URL: https://issues.apache.org/jira/browse/FLINK-31775 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.16.1 >Reporter: Sergio Sainz >Priority: Major > > When using native kubernetes deployment mode, and when new TaskManager pod is > started to process a job, the TaskManager pod will attempt to register itself > to the resource manager (JobManager). the TaskManager looks up the resource > manager per ip-address > (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) > > Nevertheless when istio is enabled, the resolution by ip address is blocked, > and hence we see that the job cannot start because task manager cannot > register with the resource manager: > 2023-04-10 23:24:19,752 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not > resolve ResourceManager address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in > 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. > > Notice that when HA is disabled, the resolution of the resource manager is > made by service name and so the resource
[jira] [Created] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
Sergio Sainz created FLINK-31775: Summary: High-Availability not supported in kubernetes when istio enabled Key: FLINK-31775 URL: https://issues.apache.org/jira/browse/FLINK-31775 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.16.1 Reporter: Sergio Sainz When using native kubernetes deployment mode, and when new TaskManager is started to process a job, the TaskManager will attempt to register itself to the resource manager (job manager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://fl...@local-mci-ar32a-dev-flink-cluster.mstr-env-mci-ar32a-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice it is not possible to disable istio (as explained here : https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html) Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser closed pull request #22371: [FLINK-28171] [flink-kubernates] enable add appProtocol via the configuration and verify it is not overridden by Default port defintion
MartijnVisser closed pull request #22371: [FLINK-28171] [flink-kubernates] enable add appProtocol via the configuration and verify it is not overridden by Default port defintion URL: https://github.com/apache/flink/pull/22371 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-elasticsearch] MartijnVisser commented on pull request #59: [FLINK-31696] ElasticSearch nightly CI failure
MartijnVisser commented on PR #59: URL: https://github.com/apache/flink-connector-elasticsearch/pull/59#issuecomment-1503913242 Nightly triggered at https://github.com/apache/flink-connector-elasticsearch/actions/runs/4670944068 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-17508) Develop Oracle Catalog
[ https://issues.apache.org/jira/browse/FLINK-17508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-17508: -- Assignee: (was: RocMarshal) > Develop Oracle Catalog > -- > > Key: FLINK-17508 > URL: https://issues.apache.org/jira/browse/FLINK-17508 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: Flavio Pompermaier >Priority: Major > > Similarly to https://issues.apache.org/jira/browse/FLINK-16471 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31696) ElasticSearch nightly CI failure
[ https://issues.apache.org/jira/browse/FLINK-31696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-31696. -- Fix Version/s: elasticsearch-4.0.0 Resolution: Fixed Fixed in main: 85f0f4057a6b241a7d9b0ae2996ac1147e0e2428 > ElasticSearch nightly CI failure > > > Key: FLINK-31696 > URL: https://issues.apache.org/jira/browse/FLINK-31696 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / ElasticSearch >Reporter: Danny Cranmer >Assignee: Andriy Redko >Priority: Major > Labels: pull-request-available > Fix For: elasticsearch-4.0.0 > > > Investigate and fix the nightly CI failure. Example > [https://github.com/apache/flink-connector-elasticsearch/actions/runs/4585918498/jobs/8098357503] > > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) > on project flink-connector-elasticsearch-base: Execution default-test of goal > org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: > org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' > failed to discover tests: 'java.lang.Object > com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(java.lang.Class)' > -> [Help 1] {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-elasticsearch] MartijnVisser merged pull request #59: [FLINK-31696] ElasticSearch nightly CI failure
MartijnVisser merged PR #59: URL: https://github.com/apache/flink-connector-elasticsearch/pull/59 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-elasticsearch] reta commented on pull request #59: [FLINK-31696] ElasticSearch nightly CI failure
reta commented on PR #59: URL: https://github.com/apache/flink-connector-elasticsearch/pull/59#issuecomment-1503855950 > @reta Can you include setting the version for the connector to 4.0-SNAPSHOT, given that we'll drop support for Flink 1.16 with this PR? @MartijnVisser it was done already, https://github.com/apache/flink-connector-elasticsearch/blob/main/pom.xml#L32 :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate
flinkbot commented on PR #22381: URL: https://github.com/apache/flink/pull/22381#issuecomment-1503852904 ## CI report: * d4ac5891e842a21920525a5ed5f0ccce5206718c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31763) Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate
[ https://issues.apache.org/jira/browse/FLINK-31763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31763: --- Labels: pull-request-available (was: ) > Ensure that the total number of requested buffers does not exceed poolSize + > maxOverdraftBuffersPerGate > --- > > Key: FLINK-31763 > URL: https://issues.apache.org/jira/browse/FLINK-31763 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.1 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > > As we discussed in FLINK-31610, new buffers can be requested only when > "{_}numOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments < > poolSize + maxOverdraftBuffersPerGate"{_}. > Consider such a scenario, the {{{}CurrentPoolSize = 5{}}}, > {{{}numOfRequestedMemorySegments = 7{}}}, {{{}maxOverdraftBuffersPerGate = > 2{}}}. If {{{}numberOfRequestedOverdraftMemorySegments = 0{}}}, then 2 > buffers can be requested now. > We should convert {{numberOfRequestedMemorySegments}} to > {{numberOfRequestedOverdraftMemorySegments}} when poolSize is decreased. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa opened a new pull request, #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate
reswqa opened a new pull request, #22381: URL: https://github.com/apache/flink/pull/22381 ## What is the purpose of the change *As we discussed in [FLINK-31610](https://issues.apache.org/jira/browse/FLINK-31610), new buffers can be requested only when `numOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments < poolSize + maxOverdraftBuffersPerGate`.* *Consider such a scenario, the `CurrentPoolSize = 5`, `numOfRequestedMemorySegments = 7`, `maxOverdraftBuffersPerGate = 2`. If `numberOfRequestedOverdraftMemorySegments = 0`, then `2` buffers can be requested now.* *We should convert `numberOfRequestedMemorySegments` to `numberOfRequestedOverdraftMemorySegments` when poolSize is decreased.* ## Brief change log - *Convert `numberOfRequestedMemorySegments` to `numberOfRequestedOverdraftMemorySegments` when poolSize is decreased.* ## Verifying this change Add unit test in `LocalBufferPoolTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes(but per-buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163104861 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: @AlexAxeman see my latest comment above about the default case being `headerProvider` is null. In that case, we no longer need / want to return `Optional`. i.e. ``` interface HeaderProvider { Headers getHeaders(IN input); // no default implementation } ``` and in `KafkaRecordSerializationSchemaWrapper#serialize()`: ``` if (headerProvider != null) { return new ProducerRecord( ..., headerProvider.getHeaders(input) ) } else ( return new ProducerRecord() // no headers ) ``` ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: @AlexAxeman see my latest comment above about the default case being `headerProvider` is null. In that case, we no longer need / want to return `Optional`. i.e. ``` interface HeaderProvider { Headers getHeaders(IN input); // no default implementation } ``` and in `KafkaRecordSerializationSchemaWrapper#serialize()`: ``` if (headerProvider != null) { return new ProducerRecord( ..., headerProvider.getHeaders(input) ) } else ( return new ProducerRecord(...) // no headers ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163104861 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: @AlexAxeman see my latest comment above about the default case being `headerProvider` is null. In that case, we no longer need / want to return `Optional`. i.e. ``` interface HeaderProvider { Headers getHeaders(IN input); } ``` and in `KafkaRecordSerializationSchemaWrapper#serialize()`: ``` if (headerProvider != null) { return new ProducerRecord( ..., headerProvider.getHeaders(input) ) } else ( return new ProducerRecord() // no headers ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
AlexAxeman commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163100734 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: Just reading up on your second comment. Yes, I think that would be better than. The RecordProducer also handles this gracefully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
AlexAxeman commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163098741 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: Hm... Personally I don't really like that. The big thing about Optionals is to force the handling of possible `null` values. You don't need that protecting while returning collections, as you can always return an empty one, which is what I did. I'd much rather go for an empty (immutable) collection. Initially, I returned a `Iterable` and changed that to kafka API Headers upon request. I'm not really leaning to either side, so happy to change that back @RamanVerma, wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163096185 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: Actually - on second thought, the default behavior / case can simply be that `headerProvider` is `null`. For example, `keySerializer` is nullable for a `KafkaRecordSerializationSchema` - the same can apply for the `headerProvider`. In this case, we don't need a default implementation for the `HeaderProvider#getProvider` method any more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mohsenrezaeithe commented on pull request #22302: [hotfix] [docs] Fix `ROW()` decleration syntax
mohsenrezaeithe commented on PR #22302: URL: https://github.com/apache/flink/pull/22302#issuecomment-1503771608 @wuchong could you please approve this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta opened a new pull request, #18: [hotfix] Use 1.0.0-1.16 baseline for API compatibility checks
reta opened a new pull request, #18: URL: https://github.com/apache/flink-connector-opensearch/pull/18 Use 1.0.0-1.16 baseline for API compatibility checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163083277 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: I was thinking about `Optional getHeaders(IN input)`, and default is returning `Optional.empty()`. I'm also wondering if it's possible / makes sense to NOT expose the Kafka `Headers` class on the public API. Although Kafka API seems to be rather stable, in general its best if we can avoid exposing our dependencies' classes on the public interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31696) ElasticSearch nightly CI failure
[ https://issues.apache.org/jira/browse/FLINK-31696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31696: --- Labels: pull-request-available (was: ) > ElasticSearch nightly CI failure > > > Key: FLINK-31696 > URL: https://issues.apache.org/jira/browse/FLINK-31696 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / ElasticSearch >Reporter: Danny Cranmer >Assignee: Andriy Redko >Priority: Major > Labels: pull-request-available > > Investigate and fix the nightly CI failure. Example > [https://github.com/apache/flink-connector-elasticsearch/actions/runs/4585918498/jobs/8098357503] > > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) > on project flink-connector-elasticsearch-base: Execution default-test of goal > org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: > org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' > failed to discover tests: 'java.lang.Object > com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(java.lang.Class)' > -> [Help 1] {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-elasticsearch] reta opened a new pull request, #59: [FLINK-31696] ElasticSearch nightly CI failure
reta opened a new pull request, #59: URL: https://github.com/apache/flink-connector-elasticsearch/pull/59 ElasticSearch nightly CI failure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
reta commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1163061352 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: No worries at all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions
[ https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711040#comment-17711040 ] Aitozi commented on FLINK-18027: [~libenchao] do you know why the explicit ROW construct is removed from the doc, (the git history of the file seems not trackable) Does it's not encouraged to use explicit ROW call ? If not, I think we should add it back. > ROW value constructor cannot deal with complex expressions > -- > > Key: FLINK-18027 > URL: https://issues.apache.org/jira/browse/FLINK-18027 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Benchao Li >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > {code:java} > create table my_source ( > my_row row > ) with (...); > create table my_sink ( > my_row row > ) with (...); > insert into my_sink > select ROW(my_row.a, my_row.b) > from my_source;{code} > will throw excepions: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "." at line 1, column 18.Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered > "." at line 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627) > at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused > by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line > 1, column 18.Was expecting one of: ")" ... "," ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at > org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: > Encountered "." at line 1, column 18.Was expecting one of: ")" ... "," > ... at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-17508) Develop Oracle Catalog
[ https://issues.apache.org/jira/browse/FLINK-17508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-17508: --- Summary: Develop Oracle Catalog (was: Develop OracleCatalog) > Develop Oracle Catalog > -- > > Key: FLINK-17508 > URL: https://issues.apache.org/jira/browse/FLINK-17508 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: Flavio Pompermaier >Assignee: RocMarshal >Priority: Major > > Similarly to https://issues.apache.org/jira/browse/FLINK-16471 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29672) Support oracle catalog
[ https://issues.apache.org/jira/browse/FLINK-29672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-29672. -- Resolution: Duplicate > Support oracle catalog > --- > > Key: FLINK-29672 > URL: https://issues.apache.org/jira/browse/FLINK-29672 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: waywtdcc >Priority: Major > > Support oracle catalog -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-shaded] MartijnVisser commented on a diff in pull request #120: [FLINK-31719][Build/Shaded] Update Netty to 4.1.91.Final
MartijnVisser commented on code in PR #120: URL: https://github.com/apache/flink-shaded/pull/120#discussion_r1163023881 ## pom.xml: ## @@ -355,6 +355,20 @@ under the License. + + +org.cyclonedx +cyclonedx-maven-plugin Review Comment: Yeah this shouldn't have been here. Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
tzulitai commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163021963 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { Review Comment: > it does not assign the headers, but produces them In that case, how about `HeaderProvider`? `Provider` seems to be a more canonical name for this. And correspondingly, the single method would be called `getHeaders(IN input)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711033#comment-17711033 ] Aitozi commented on FLINK-31774: I think we should add the description for this two statements similar to the INSERT statement. CC [~luoyuxia] I'm willing to work on this. > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Priority: Major > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31774) Add document for delete and update statement
[ https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31774: --- Parent: FLINK-30648 Issue Type: Sub-task (was: Improvement) > Add document for delete and update statement > > > Key: FLINK-31774 > URL: https://issues.apache.org/jira/browse/FLINK-31774 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aitozi >Priority: Major > > I do not find the declaration about the usage of DELETE and UPDATE statement > in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] MartijnVisser commented on pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
MartijnVisser commented on PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#issuecomment-1503642528 > may I ask you please to trigger nightly action manually https://github.com/apache/flink-connector-opensearch/actions/runs/4669419298 is started! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] MartijnVisser commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
MartijnVisser commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1163012687 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: > I'll fix this afterwards :) I'm an idiot and got confused by Flink 1.16.1, there's no Flink 1.17.1 yet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31774) Add document for delete and update statement
Aitozi created FLINK-31774: -- Summary: Add document for delete and update statement Key: FLINK-31774 URL: https://issues.apache.org/jira/browse/FLINK-31774 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi I do not find the declaration about the usage of DELETE and UPDATE statement in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31760) COALESCE() with NULL arguments throws error
[ https://issues.apache.org/jira/browse/FLINK-31760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711022#comment-17711022 ] Mohsen Rezaei commented on FLINK-31760: --- [~lincoln.86xy], if I understand correctly, the error message is a generic SQL validation thrown for similar uses in the SQL, e.g. [{{testValuesWithoutTypeCoercion}}|https://github.com/mohsenrezaeithe/flink/blob/7dd42cf22e85ce2e9dbbde5210edd65bcb94f459/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/NullTypeTest.java#L47-L47]. Are you suggesting to improve the error at the root, or wrap it somewhere in the stack and add a better message? I'll also go ahead and add the {{CAST(NULL AS VARCHAR)}} note in the docs for clarity. > COALESCE() with NULL arguments throws error > --- > > Key: FLINK-31760 > URL: https://issues.apache.org/jira/browse/FLINK-31760 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1, 1.18.0 > Environment: Flink 1.16.1 >Reporter: Mohsen Rezaei >Priority: Major > > All arguments may not be nullable: > {code} > SELECT COALESCE(NULL, NULL) FROM UnnamedTable$0 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 17 to line 1, column 20: Illegal > use of 'NULL' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703) > at CoalesceTest.main(CoalesceTest.java:58) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 17 to line 1, column 20: Illegal use of 'NULL' > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1837) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1912) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:419) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) > ... 5 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use > of 'NULL' > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at >
[GitHub] [flink-connector-opensearch] reta commented on pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
reta commented on PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#issuecomment-1503618001 > LGTM Thanks a lot, may I ask you please to trigger nightly action manually when you have an opportunity, thank you (once it is green, I will replicate changes to Elasticsearch connector) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] MartijnVisser merged pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
MartijnVisser merged PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] MartijnVisser commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
MartijnVisser commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162981670 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: I'll fix this afterwards :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31760) COALESCE() with NULL arguments throws error
[ https://issues.apache.org/jira/browse/FLINK-31760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711014#comment-17711014 ] Mohsen Rezaei commented on FLINK-31760: --- Hi [~lemonjing], thanks for the quick response. I'm interested in providing a fix, but I need to dig a bit and see how to best address the issue, and familiarize myself with that part of the code. Let me know if you have any pointers. > COALESCE() with NULL arguments throws error > --- > > Key: FLINK-31760 > URL: https://issues.apache.org/jira/browse/FLINK-31760 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1, 1.18.0 > Environment: Flink 1.16.1 >Reporter: Mohsen Rezaei >Priority: Major > > All arguments may not be nullable: > {code} > SELECT COALESCE(NULL, NULL) FROM UnnamedTable$0 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 17 to line 1, column 20: Illegal > use of 'NULL' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703) > at CoalesceTest.main(CoalesceTest.java:58) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 17 to line 1, column 20: Illegal use of 'NULL' > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1837) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1912) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:419) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) > ... 5 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use > of 'NULL' > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) > ... 21 more > {code} > As >
[jira] (FLINK-31760) COALESCE() with NULL arguments throws error
[ https://issues.apache.org/jira/browse/FLINK-31760 ] Mohsen Rezaei deleted comment on FLINK-31760: --- was (Author: JIRAUSER298550): Hi [~lemonjing], thanks for the quick response. I'm interested in providing a fix, but I need to dig a bit and see how to best address the issue, and familiarize myself with that part of the code. Let me know if you have any pointers. > COALESCE() with NULL arguments throws error > --- > > Key: FLINK-31760 > URL: https://issues.apache.org/jira/browse/FLINK-31760 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1, 1.18.0 > Environment: Flink 1.16.1 >Reporter: Mohsen Rezaei >Priority: Major > > All arguments may not be nullable: > {code} > SELECT COALESCE(NULL, NULL) FROM UnnamedTable$0 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 17 to line 1, column 20: Illegal > use of 'NULL' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703) > at CoalesceTest.main(CoalesceTest.java:58) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 17 to line 1, column 20: Illegal use of 'NULL' > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1837) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1912) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:419) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) > ... 5 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use > of 'NULL' > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) > ... 21 more > {code} > As >
[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition
[ https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711002#comment-17711002 ] Aitozi commented on FLINK-31533: [~luoyuxia] I think it's a useful feature, and I'd like to support this, can I take this ticket ? > CREATE TABLE AS SELECT should support to define partition > - > > Key: FLINK-31533 > URL: https://issues.apache.org/jira/browse/FLINK-31533 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] godfreyhe commented on a diff in pull request #22376: [FLINK-31767][table-planner] Improve the implementation for "analyze table" execution on partitioned table
godfreyhe commented on code in PR #22376: URL: https://github.com/apache/flink/pull/22376#discussion_r1162913583 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java: ## @@ -137,13 +138,46 @@ private static CatalogColumnStatistics mergeColumnStatistics( if (!columns.isEmpty()) { columnStat = convertToColumnStatistics(row, columns); } -return new Tuple2<>(tableStat, columnStat); +return new StatisticsWrapper(tableStat, columnStat); +} + +private static Map executeSqlAndGenerateStatisticsForMultiParts( +TableEnvironmentImpl tableEnv, +List columns, +String statSql, +int partitionCount) { +TableResult tableResult = tableEnv.executeSql(statSql); Review Comment: Thanks for the reminding -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
AlexAxeman commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1162890942 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Headers produceHeaders(IN input) { +return new RecordHeaders(new ArrayList<>()); Review Comment: Yeah, thought about that. So it should be a `Collections.emptyList()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
AlexAxeman commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1162889963 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { Review Comment: Strictly, it does not assign the headers, but produces them. If you're concerned about the naming conflict, how about HeaderCreator? (not that I like that very much...). I'm happy to rename it to HeaderAssigner otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] nikolasten commented on pull request #16: [BK-3.0][FLINK-30552][Connector/Pulsar] drop batch message size assertion, better set the cursor position.
nikolasten commented on PR #16: URL: https://github.com/apache/flink-connector-pulsar/pull/16#issuecomment-1503435012 Disregard my latest comment. When used with flink 1.17 works fine. If anybody else has a problem and waiting for release, we used snapshot version with flink 1.17. ```gradle maven { url "https://repository.apache.org/content/repositories/snapshots; } implementation "org.apache.flink:flink-connector-pulsar:4.0-20230404.065406-2" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup
zhipeng93 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1162850039 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.datastream.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An operator that implements the co-group logic. + * + * @param The class type of the first input. + * @param The class type of the second input. + * @param The class type of the key. + * @param The class type of the output values. + */ +public class CoGroupOperator +extends AbstractUdfStreamOperator> +implements TwoInputStreamOperator, BoundedMultiInput { + +private PushSorter>> sorterA; +private PushSorter>> sorterB; +private TypeComparator>> comparatorA; +private TypeComparator>> comparatorB; +private KeySelector keySelectorA; +private KeySelector keySelectorB; +private TypeSerializer>> keyAndValueSerializerA; +private TypeSerializer>> keyAndValueSerializerB; +private TypeSerializer keySerializer; +private DataOutputSerializer dataOutputSerializer; +private long lastWatermarkTimestamp = Long.MIN_VALUE; +private int remainingInputNum = 2; + +public CoGroupOperator(CoGroupFunction function) { +super(function); +} + +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader(); +MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager(); +IOManager ioManager = containingTask.getEnvironment().getIOManager(); + +keySelectorA = config.getStatePartitioner(0, userCodeClassLoader); +keySelectorB = config.getStatePartitioner(1, userCodeClassLoader); +keySerializer = config.getStateKeySerializer(userCodeClassLoader); +int keyLength = keySerializer.getLength(); + +TypeSerializer typeSerializerA = config.getTypeSerializerIn(0,
[GitHub] [flink] Aitozi commented on pull request #20246: [FLINK-28074][table-planner] show statistics details for DESCRIBE EXTENDED
Aitozi commented on PR #20246: URL: https://github.com/apache/flink/pull/20246#issuecomment-1503403992 I think it's a very useful auxiliary command, can we move this PR forward @swuferhong @godfreyhe ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #22376: [FLINK-31767][table-planner] Improve the implementation for "analyze table" execution on partitioned table
Aitozi commented on code in PR #22376: URL: https://github.com/apache/flink/pull/22376#discussion_r1162852786 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java: ## @@ -137,13 +138,46 @@ private static CatalogColumnStatistics mergeColumnStatistics( if (!columns.isEmpty()) { columnStat = convertToColumnStatistics(row, columns); } -return new Tuple2<>(tableStat, columnStat); +return new StatisticsWrapper(tableStat, columnStat); +} + +private static Map executeSqlAndGenerateStatisticsForMultiParts( +TableEnvironmentImpl tableEnv, +List columns, +String statSql, +int partitionCount) { +TableResult tableResult = tableEnv.executeSql(statSql); Review Comment: The empty partition will generate an empty sql here fails the test ci -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reswqa commented on pull request #17: [hotfix] Add opensearch.yml to enable flink to find the data
reswqa commented on PR #17: URL: https://github.com/apache/flink-connector-opensearch/pull/17#issuecomment-1503369425 In fact, I prefer this approach, which is what I propose [here](https://issues.apache.org/jira/browse/FLINK-31704?focusedCommentId=17707954=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17707954). But let's wait for @dannycranmer's opinion again. If we can reach an agreement, I am willing to create a ticket to do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
reta commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162830988 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: Moved it back to 1.17.0 ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
reta commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162829801 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: Hm ... there is no 1.17.1 yet ..., ``` Run wget -q -c https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz -O - | tar -xz wget -q -c https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz -O - | tar -xz shell: /usr/bin/bash -e {0} env: MVN_COMMON_OPTIONS: -U -B --no-transfer-progress -Dflink.version=1.17.1 MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 FLINK_CACHE_DIR: /tmp/cache/flink MVN_BUILD_OUTPUT_FILE: /tmp/mvn_build_output.out MVN_VALIDATION_DIR: /tmp/flink-validation-deployment JAVA_HOME: /opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/8.0.362-9/x64 JAVA_HOME_8_X64: /opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/8.0.362-9/x64 MVN_DEPENDENCY_CONVERGENCE: -Dflink.convergence.phase=install -Pcheck-convergence binary_url: https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz cache_binary: true gzip: stdin: unexpected end of file tar: Child returned status 1 tar: Error is not recoverable: exiting now Error: Process completed with exit code 2. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG
dmvk commented on code in PR #22364: URL: https://github.com/apache/flink/pull/22364#discussion_r1162826386 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -195,6 +195,34 @@ void testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws E ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph); } +@Test +void testArchivedJobVerticesPresent() throws Exception { +final JobGraph jobGraph = createJobGraph(); +jobGraph.setSnapshotSettings( +new JobCheckpointingSettings( +CheckpointCoordinatorConfiguration.builder().build(), null)); + +final ArchivedExecutionGraph archivedExecutionGraph = +new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor) +.build(EXECUTOR_RESOURCE.getExecutor()) +.getArchivedExecutionGraph(JobStatus.INITIALIZING, null); + +ArchivedExecutionJobVertex jobVertex = +archivedExecutionGraph.getJobVertex(JOB_VERTEX.getID()); +assertThat(jobVertex) +.isNotNull() +.satisfies( +archived -> { +assertThat(archived.getParallelism()) +.isEqualTo(JOB_VERTEX.getParallelism()); +// JOB_VERTEX.maxP == -1, but we want the actual maxP determined by the +// scheduler + assertThat(archived.getMaxParallelism()).isEqualTo(128); +}); Review Comment: nit: ```suggestion assertThat(jobVertex).isNotNull(); assertThat(jobVertex.getParallelism()).isEqualTo(JOB_VERTEX.getParallelism()); // JOB_VERTEX.maxP == -1, but we want the actual maxP determined by the scheduler assertThat(jobVertex.getMaxParallelism()).isEqualTo(128); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31655) Adaptive Channel selection for partitioner
[ https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710956#comment-17710956 ] Anton Kalashnikov commented on FLINK-31655: --- Hi [~tartarus] thanks for this discussion. I think I remember the same discussion which [~gaoyunhaii] mentioned above. I just want to add that I tried to implement it as well. You can take a look at it here - https://github.com/apache/flink/pull/16224. My solution works without any extra synchronization and locks but the performance for other rebalances(non-Adaptive) was impacted anyway. I changed only `BufferWritingResultPartition#emitRecord` and added a non-volatile variable to `PipelinedSubpartition` which was enough for small degradation in the benchmark since it is a hot path. I don't remember exactly why we stopped discussing it but we should be careful about it since Adaptive Rebalance is not so common usage and it will be unfortunate if we slow down more common rebalances in favor to have this less common one. > Adaptive Channel selection for partitioner > -- > > Key: FLINK-31655 > URL: https://issues.apache.org/jira/browse/FLINK-31655 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: tartarus >Assignee: tartarus >Priority: Major > > In Flink, if the upstream and downstream operator parallelism is not the > same, then by default the RebalancePartitioner will be used to select the > target channel. > In our company, users often use flink to access redis, hbase or other rpc > services, If some of the Operators are slow to return requests (for external > service reasons), then because Rebalance/Rescale are Round-Robin the Channel > selection policy, so the job is easy to backpressure. > Because the Rebalance/Rescale policy does not care which subtask the data is > sent to downstream, so we expect Rebalance/Rescale to refer to the processing > power of the downstream subtask when choosing a Channel. > Send more data to the free subtask, this ensures the best possible throughput > of job! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
reta commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162826956 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: Missed that, updated both to `1.17.1`. I think the snapshots (fe `1.18-SNAPSHOT`) are only for weekly builds (since this is very unstable target). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] MartijnVisser commented on pull request #17: [hotfix] Add opensearch.yml to enable flink to find the data
MartijnVisser commented on PR #17: URL: https://github.com/apache/flink-connector-opensearch/pull/17#issuecomment-1503352188 > So my main question is, if we modify a major bug in the document, do we have to wait for the next connector release so that users can see the correct version doc. I don't think we should. I think we should update Flink to updates the references so they point to the branch, not the tag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] MartijnVisser commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
MartijnVisser commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162816269 ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: I actually think you need to add `.github/workflows/push_pr.yml` to set the Flink version either to a matrix (with Flink 1.17.1 and potentially Flink 1.18-SNAPSHOT), or at least that version should be updated to 1.17.1 :) ## .github/workflows/release.yml: ## @@ -24,7 +24,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16.1, 1.17.0] +flink: [1.17.0] Review Comment: I actually think you need to update `.github/workflows/push_pr.yml` to set the Flink version either to a matrix (with Flink 1.17.1 and potentially Flink 1.18-SNAPSHOT), or at least that version should be updated to 1.17.1 :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reswqa commented on pull request #17: [hotfix] Add opensearch.yml to enable flink to find the data
reswqa commented on PR #17: URL: https://github.com/apache/flink-connector-opensearch/pull/17#issuecomment-1503347392 I know --branch can point to both branch names and tags. If I understand correctly, we only push tags like v3.0.0 forward during each release. So my main question is, if we modify a major bug in the document, do we have to wait for the next connector release so that users can see the correct version doc. If we use v1.0.0 in setup-docs.sh, will there be this problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix
reta commented on code in PR #16: URL: https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162810174 ## .github/workflows/weekly.yml: ## @@ -26,7 +26,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: -flink: [1.16-SNAPSHOT, 1.17-SNAPSHOT, 1.18-SNAPSHOT] Review Comment: Thanks a lot @MartijnVisser , rebased, I think we are good to go -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-22727) SerializableAvroSchema cannot handle large schema
[ https://issues.apache.org/jira/browse/FLINK-22727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Serge Travin resolved FLINK-22727. -- Resolution: Duplicate > SerializableAvroSchema cannot handle large schema > - > > Key: FLINK-22727 > URL: https://issues.apache.org/jira/browse/FLINK-22727 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.0 >Reporter: Serge Travin >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > The issue is very similar to the > https://issues.apache.org/jira/browse/FLINK-19491 > Here is the stack trace for the problem. > > > {{Caused by: java.lang.RuntimeException: Could not serialize serializer into > the configuration.Caused by: java.lang.RuntimeException: Could not serialize > serializer into the configuration. at > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.writeParametersToConfig(RuntimeSerializerFactory.java:61) > ~[flink-core-1.13.0.jar:1.13.0] at > org.apache.flink.runtime.operators.util.TaskConfig.setTypeSerializerFactory(TaskConfig.java:1211) > ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.runtime.operators.util.TaskConfig.setOutputSerializer(TaskConfig.java:594) > ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.connectJobVertices(JobGraphGenerator.java:1336) > ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.translateChannel(JobGraphGenerator.java:820) > ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:664) > ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] ... 15 moreCaused by: > java.io.UTFDataFormatException at > java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2164) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2007) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeUTF(ObjectOutputStream.java:869) > ~[?:1.8.0_202] at > org.apache.flink.formats.avro.typeutils.SerializableAvroSchema.writeObject(SerializableAvroSchema.java:55) > ~[flink-avro-1.13.0.jar:1.13.0] at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_202] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_202] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_202] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_202] at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > ~[?:1.8.0_202] at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > ~[?:1.8.0_202] at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > ~[flink-core-1.13.0.jar:1.13.0] at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:546) > ~[flink-core-1.13.0.jar:1.13.0] at > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.writeParametersToConfig(RuntimeSerializerFactory.java:59) > ~[flink-core-1.13.0.jar:1.13.0] at > org.apache.flink.runtime.operators.util.TaskConfig.setTypeSerializerFactory(TaskConfig.java:1211) > ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.runtime.operators.util.TaskConfig.setOutputSerializer(TaskConfig.java:594) > ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.connectJobVertices(JobGraphGenerator.java:1336) > ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.translateChannel(JobGraphGenerator.java:820) > ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:664) > ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] ... 15 more}} > -- This message was sent by Atlassian Jira