[jira] [Comment Edited] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200
 ] 

Aitozi edited comment on FLINK-18027 at 4/12/23 5:43 AM:
-

The content is modified

from 

[https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md]

to

[https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml]
 

 

I think the content below is removed accidently.  CC [~sjwiesman] 

 
{code:java}
-- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code}
 

I think we should add this back, since the explicit ROW constructor's 
limitation has been solved


was (Author: aitozi):
The content is modified

from 

[https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md]

to

[https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml]
 

 

I think the content below is removed accidently. 

 
{code:java}
-- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code}
 

 

I think we should add this back, since the explicit ROW constructor's 
limitation has been solved

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> 

[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711200#comment-17711200
 ] 

Aitozi commented on FLINK-18027:


The content is modified

from 

[https://github.com/apache/flink/blob/177310ebe3d552ec71a1f1f97e0207cf30b6efed/docs/dev/table/functions/systemFunctions.md]

to

[https://github.com/apache/flink/blob/a5372e0c92c7a0f465beba5e5204b07769cd92e6/docs/data/sql_functions.yml]
 

 

I think the content below is removed accidently. 

 
{code:java}
-- explicit ROW constructor ROW(value1 [, value2]*) {% endhighlight %} {code}
 

 

I think we should add this back, since the explicit ROW constructor's 
limitation has been solved

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at 

[GitHub] [flink] Aitozi commented on a diff in pull request #22378: [FLINK-31344][planner] Support to update nested columns in update sta…

2023-04-11 Thread via GitHub


Aitozi commented on code in PR #22378:
URL: https://github.com/apache/flink/pull/22378#discussion_r1162597631


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml:
##
@@ -20,17 +20,17 @@ limitations under the License.
 
   

[GitHub] [flink] Aitozi commented on a diff in pull request #22378: [FLINK-31344][planner] Support to update nested columns in update sta…

2023-04-11 Thread via GitHub


Aitozi commented on code in PR #22378:
URL: https://github.com/apache/flink/pull/22378#discussion_r1162702918


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml:
##
@@ -20,17 +20,17 @@ limitations under the License.
 
   

[GitHub] [flink-ml] lindong28 opened a new pull request, #232: [hotfix] Update pattern matching in test_ml_lib_completeness to work with release branch

2023-04-11 Thread via GitHub


lindong28 opened a new pull request, #232:
URL: https://github.com/apache/flink-ml/pull/232

   ## What is the purpose of the change
   
   Update pattern matching in test_ml_lib_completeness to work with release 
branch
   
   ## Brief change log
   
   Updated test_ml_lib_completeness.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled

2023-04-11 Thread Weihua Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711185#comment-17711185
 ] 

Weihua Hu commented on FLINK-31775:
---

Thanks [~sergiosp] report this. 

IMHO, this is the same issue as FLINK-28171.
After FLINK-28171 is fixed, you can define the appProtocol  in podTemplate both 
for kubernetes-operator and native kubernetes.

> High-Availability not supported in kubernetes when istio enabled
> 
>
> Key: FLINK-31775
> URL: https://issues.apache.org/jira/browse/FLINK-31775
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When using native kubernetes deployment mode with high-availability (HA), and 
> when new TaskManager pod is started to process a job, the TaskManager pod 
> will attempt to register itself to the resource manager (JobManager). the 
> TaskManager looks up the resource manager per ip-address 
> (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)
>  
> Nevertheless when istio is enabled, the resolution by ip address is blocked, 
> and hence we see that the job cannot start because task manager cannot 
> register with the resource manager:
> 2023-04-10 23:24:19,752 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
> 1 ms: Could not connect to rpc endpoint under address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.
>  
> Notice that when HA is disabled, the resolution of the resource manager is 
> made by service name and so the resource manager can be found
>  
> 2023-04-11 00:49:34,162 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
> registration at resource manager 
> akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
>  under registration id 83ad942597f86aa880ee96f1c2b8b923.
>  
> Notice in my case , it is not possible to disable istio as explained here: 
> [https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]
>  
> Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , 
> logging as separate defect as I believe the fix of FLINK-28171 won't fix this 
> case. FLINK-28171  is about Flink Kubernetes Operator and this is about 
> native kubernetes deployment.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-04-11 Thread via GitHub


pgaref commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() {
 
 @Override
 public final void cancel() throws Exception {
-isRunning = false;
-canceled = true;
+taskState.canceled = true;

Review Comment:
   Hey @rkhachatryan , thanks for taking another look!
   
   Just wanted to maintain the existing logic where both flags (running and 
canceled are true) but I agree having to maintain both enum and flags is 
confusing --  thinking it over looks like we only need to protect from invalid 
transitions in general and more specifically here from: CANCELED to RUNNING as 
added -- WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-04-11 Thread via GitHub


pgaref commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() {
 
 @Override
 public final void cancel() throws Exception {
-isRunning = false;
-canceled = true;
+taskState.canceled = true;

Review Comment:
   Hey @rkhachatryan , thanks for taking another look!
   
   Just wanted to maintain the existing logic where both flags (running and 
canceled are true) but I agree having both enum and states is confusing --  
thinking it over looks like we only need to protect from invalid transitions in 
general and more specifically here from: CANCELED to RUNNING as added -- WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-04-11 Thread via GitHub


pgaref commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() {
 
 @Override
 public final void cancel() throws Exception {
-isRunning = false;
-canceled = true;
+taskState.canceled = true;

Review Comment:
   Hey @rkhachatryan , thanks for taking another look!
   
   Just wanted to maintain the existing logic where both flags (running and 
canceled are true) but I agree having both enum and states is confusing --  
thinking it over looks like we only need to protect from the invalid 
transition: Canceled to Running as we recently added -- WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-04-11 Thread via GitHub


pgaref commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() {
 
 @Override
 public final void cancel() throws Exception {
-isRunning = false;
-canceled = true;
+taskState.canceled = true;

Review Comment:
   Hey @rkhachatryan , thanks for taking another look!
   
   Just wanted to maintain the existing logic where both flags (running and 
canceled are true) but thinking it over looks like we only need to protect from 
the invalid transition: Canceled to Running -- WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-04-11 Thread via GitHub


pgaref commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163517899


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() {
 
 @Override
 public final void cancel() throws Exception {
-isRunning = false;
-canceled = true;
+taskState.canceled = true;

Review Comment:
   Hey @rkhachatryan , thanks for taking another look!
   
   Just wanted to maintain the existing logic where both flags (running and 
canceled are true) but thinking it over looks like we only need to protect from 
the invalid transition: Canceled to Running (already enforced) -- WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

2023-04-11 Thread Liam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liam updated FLINK-31762:
-
Description: 
To simplify the demonstration, let us assume that there are two topics, and 
each topic has four partitions. We have set the parallelism to eight to consume 
these two topics. However, the current partition assignment method may lead to 
some subtasks being assigned two partitions while others are left with none.

!image-2023-04-11-08-00-16-054.png|width=500,height=143!

In my case, the situation is even worse as I have ten topics, each with 100 
partitions. If I set the parallelism to 1000, some slots may be assigned seven 
partitions while others remain unassigned.

To address this issue, I propose a new partition assignment solution. In this 
approach, round-robin assignment takes place between all topics, not just one.

For example, the ideal assignment for the case mentioned above is presented 
below:

 

!https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!

This new solution can also handle cases where each topic has more partitions.

!image-2023-04-11-08-12-24-115.png|width=444,height=127!

Let us work together to reach a consensus on this proposal. Thank you!

 

FYI: how the partition be assigned currently
{code:java}
public class KafkaTopicPartitionAssigner {    
    public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
        return assign(partition.getTopic(), partition.getPartition(), 
numParallelSubtasks);
    }    public static int assign(String topic, int partition, int 
numParallelSubtasks) {
        int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
numParallelSubtasks;        // here, the assumption is that the id of Kafka 
partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + partition) % numParallelSubtasks;
    }
 {code}
for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
{code:java}
    static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) % 
numReaders;        // here, the assumption is that the id of Kafka partitions 
are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + tp.partition()) % numReaders;
    } {code}
 

 

  was:
To simplify the demonstration, let us assume that there are two topics, and 
each topic has four partitions. We have set the parallelism to eight to consume 
these two topics. However, the current partition assignment method may lead to 
some subtasks being assigned two partitions while others are left with none.

!image-2023-04-11-08-00-16-054.png|width=500,height=143!

In my case, the situation is even worse as I have ten topics, each with 100 
partitions. If I set the parallelism to 1000, some slots may be assigned seven 
partitions while others remain unassigned.

To address this issue, I propose a new partition assignment solution. In this 
approach, round-robin assignment takes place between all topics, not just one.

For example, the ideal assignment for the case mentioned above is presented 
below:

 

!https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!

This new solution can also handle cases where each topic has more partitions.

!image-2023-04-11-08-12-24-115.png|width=444,height=127!

Let us work together to reach a consensus on this proposal. Thank you!

 

FYI: how the partition be assigned currently
{code:java}
public class KafkaTopicPartitionAssigner {    
    public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
        return assign(partition.getTopic(), partition.getPartition(), 
numParallelSubtasks);
    }    public static int assign(String topic, int partition, int 
numParallelSubtasks) {
        int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
numParallelSubtasks;        // here, the assumption is that the id of Kafka 
partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + partition) % numParallelSubtasks;
    }
 {code}
 

 


> Subscribe to multiple Kafka topics may cause partition assignment skew
> --
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.18.0
>Reporter: Liam
>Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png, 
> 

[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

2023-04-11 Thread via GitHub


jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1161619708


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
 }
 }
 
+StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+for (int i = 0; i < inputs.length; ++i) {
+if (inputs[i] instanceof NetworkInputConfig) {
+TypeSerializer typeSerializerIn =
+((NetworkInputConfig) inputs[i]).getTypeSerializer();
+if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Yes, you're right. I removed the  `if ... instanceOf ...` check. And seems 
that we did not modify the `StateKeySerializer` at all, so I also removed this 
part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on pull request #231: [FLINK-31374] ProxyStreamPartitioner should implement ConfigurableStreamPartitioner

2023-04-11 Thread via GitHub


jiangxin369 commented on PR #231:
URL: https://github.com/apache/flink-ml/pull/231#issuecomment-1504431951

   @zhipeng93 Thanks for the review, I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711166#comment-17711166
 ] 

lincoln lee commented on FLINK-31301:
-

[~aitozi] thanks for fixing this! I'll try to find some time to take a look.

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-11 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711161#comment-17711161
 ] 

luoyuxia edited comment on FLINK-31533 at 4/12/23 1:27 AM:
---

[~aitozi] Thanks for voluntering,  but please remember with the syntax that 
proposed in 
[FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185]]
 
{code:java}
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]  {code}
, we can't define partition. We need to extend the syntax which may need a 
FLIP. Also, please remember to consider FLINK-31534 while extending the syntax.


was (Author: luoyuxia):
[~aitozi] Thanks for voluntering,  but please remember with the syntax that 
proposed in 
[FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185],]
 
{code:java}
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]  {code}
, we can't define partition. We need to extend the syntax which may need a 
FLIP. Also, please remember to consider FLINK-31534 while extending the syntax.

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-11 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711161#comment-17711161
 ] 

luoyuxia edited comment on FLINK-31533 at 4/12/23 1:27 AM:
---

[~aitozi] Thanks for voluntering,  but please remember with the syntax that 
proposed in

[FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185]]
{code:java}
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]  {code}
, we can't define partition. We need to extend the syntax which may need a 
FLIP. Also, please remember to consider FLINK-31534 while extending the syntax.


was (Author: luoyuxia):
[~aitozi] Thanks for voluntering,  but please remember with the syntax that 
proposed in 
[FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185]]
 
{code:java}
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]  {code}
, we can't define partition. We need to extend the syntax which may need a 
FLIP. Also, please remember to consider FLINK-31534 while extending the syntax.

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-11 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711161#comment-17711161
 ] 

luoyuxia commented on FLINK-31533:
--

[~aitozi] Thanks for voluntering,  but please remember with the syntax that 
proposed in 
[FLIP-218|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185],]
 
{code:java}
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]  {code}
, we can't define partition. We need to extend the syntax which may need a 
FLIP. Also, please remember to consider FLINK-31534 while extending the syntax.

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-31774:


Assignee: Aitozi

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711159#comment-17711159
 ] 

luoyuxia commented on FLINK-31774:
--

[~aitozi] Thanks for raising it and voluntering. I miss to add these statements 
since current no connector had integrated the delete & update api . But I think 
it worths a document since Flink itself supports such synatax. Assign to you 
now, please ping me to review after the pr is ready.

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


lindong28 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163455375


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.datastream.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * An operator that implements the co-group logic.
+ *
+ * @param  The class type of the first input.
+ * @param  The class type of the second input.
+ * @param  The class type of the key.
+ * @param  The class type of the output values.
+ */
+public class CoGroupOperator
+extends AbstractUdfStreamOperator>
+implements TwoInputStreamOperator, BoundedMultiInput {
+
+private PushSorter>> sorterA;
+private PushSorter>> sorterB;
+private TypeComparator>> comparatorA;
+private TypeComparator>> comparatorB;
+private KeySelector keySelectorA;
+private KeySelector keySelectorB;
+private TypeSerializer>> 
keyAndValueSerializerA;
+private TypeSerializer>> 
keyAndValueSerializerB;
+private TypeSerializer keySerializer;
+private DataOutputSerializer dataOutputSerializer;
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+private int remainingInputNum = 2;
+
+public CoGroupOperator(CoGroupFunction function) {
+super(function);
+}
+
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+ClassLoader userCodeClassLoader = 
containingTask.getUserCodeClassLoader();
+MemoryManager memoryManager = 
containingTask.getEnvironment().getMemoryManager();
+IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+keySelectorA = config.getStatePartitioner(0, userCodeClassLoader);
+keySelectorB = config.getStatePartitioner(1, userCodeClassLoader);
+keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+int keyLength = keySerializer.getLength();
+
+TypeSerializer typeSerializerA = config.getTypeSerializerIn(0, 

[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


lindong28 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453972


##
flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java:
##
@@ -59,6 +64,54 @@ public void testMapPartition() throws Exception {
 new int[] {5, 5, 5, 5}, 
counts.stream().mapToInt(Integer::intValue).toArray());
 }
 
+@Test
+public void testCoGroup() throws Exception {
+DataStream> data1 =
+env.fromCollection(
+Arrays.asList(Tuple2.of(1, 1), Tuple2.of(2, 2), 
Tuple2.of(3, 3)));
+DataStream> data2 =
+env.fromCollection(
+Arrays.asList(
+Tuple2.of(1, 1.5),
+Tuple2.of(5, 5.5),
+Tuple2.of(3, 3.5),
+Tuple2.of(1, 2.5)));
+DataStream result =
+DataStreamUtils.coGroup(
+data1,
+data2,
+(KeySelector, Integer>) tuple 
-> tuple.f0,
+(KeySelector, Integer>) tuple 
-> tuple.f0,
+BasicTypeInfo.DOUBLE_TYPE_INFO,
+new CoGroupFunction<
+Tuple2, Tuple2, Double>() {
+@Override
+public void coGroup(
+Iterable> 
iterableA,
+Iterable> 
iterableB,
+Collector collector) {
+List> valuesA =
+
IteratorUtils.toList(iterableA.iterator());
+List> valuesB =
+
IteratorUtils.toList(iterableB.iterator());
+
+double sum = 0;
+for (Tuple2 value : valuesA) 
{
+sum += value.f1;
+}
+for (Tuple2 value : valuesB) {
+sum += value.f1;
+}
+collector.collect(sum);
+}
+});
+
+List resultValues = 
IteratorUtils.toList(result.executeAndCollect());
+double[] resultPrimitiveValues =
+
resultValues.stream().mapToDouble(Double::doubleValue).toArray();
+assertArrayEquals(new double[] {5.0, 2.0, 6.5, 5.5}, 
resultPrimitiveValues, 1e-5);

Review Comment:
   Yes, I think this assert should always work.
   
   Can you explain why the result of this particular co-group does not preserve 
order? 
   
   Note that `StreamExecutionEnvironment#fromCollection` returns a DataStream 
with parallelism=1. And therefore the co-group operator is executed with 
parallelism=1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


lindong28 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453972


##
flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java:
##
@@ -59,6 +64,54 @@ public void testMapPartition() throws Exception {
 new int[] {5, 5, 5, 5}, 
counts.stream().mapToInt(Integer::intValue).toArray());
 }
 
+@Test
+public void testCoGroup() throws Exception {
+DataStream> data1 =
+env.fromCollection(
+Arrays.asList(Tuple2.of(1, 1), Tuple2.of(2, 2), 
Tuple2.of(3, 3)));
+DataStream> data2 =
+env.fromCollection(
+Arrays.asList(
+Tuple2.of(1, 1.5),
+Tuple2.of(5, 5.5),
+Tuple2.of(3, 3.5),
+Tuple2.of(1, 2.5)));
+DataStream result =
+DataStreamUtils.coGroup(
+data1,
+data2,
+(KeySelector, Integer>) tuple 
-> tuple.f0,
+(KeySelector, Integer>) tuple 
-> tuple.f0,
+BasicTypeInfo.DOUBLE_TYPE_INFO,
+new CoGroupFunction<
+Tuple2, Tuple2, Double>() {
+@Override
+public void coGroup(
+Iterable> 
iterableA,
+Iterable> 
iterableB,
+Collector collector) {
+List> valuesA =
+
IteratorUtils.toList(iterableA.iterator());
+List> valuesB =
+
IteratorUtils.toList(iterableB.iterator());
+
+double sum = 0;
+for (Tuple2 value : valuesA) 
{
+sum += value.f1;
+}
+for (Tuple2 value : valuesB) {
+sum += value.f1;
+}
+collector.collect(sum);
+}
+});
+
+List resultValues = 
IteratorUtils.toList(result.executeAndCollect());
+double[] resultPrimitiveValues =
+
resultValues.stream().mapToDouble(Double::doubleValue).toArray();
+assertArrayEquals(new double[] {5.0, 2.0, 6.5, 5.5}, 
resultPrimitiveValues, 1e-5);

Review Comment:
   Can you explain why the result of this particular co-group does not preserve 
order? Note that `StreamExecutionEnvironment#fromCollection` returns a 
DataStream with parallelism=1. And therefore the co-group operator is executed 
with parallelism=1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


lindong28 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453277


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.datastream.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * An operator that implements the co-group logic.
+ *
+ * @param  The class type of the first input.
+ * @param  The class type of the second input.
+ * @param  The class type of the key.
+ * @param  The class type of the output values.
+ */
+public class CoGroupOperator
+extends AbstractUdfStreamOperator>
+implements TwoInputStreamOperator, BoundedMultiInput {
+
+private PushSorter>> sorterA;

Review Comment:
   The main reason is to reduce the amount of work in this PR that will be 
discarded in the future.
   
   In order to change the type from `StreamRecord` to `IN1`, we would need 
to update the classes `BytesKeyNormalizationUtil`, 
`FixedLengthByteKeyComparator`, `KeyAndValueSerializer` and 
`VariableLengthByteKeyComparator` so that they don't rely on `StreamRecord`.
   
   The above four classes are copied and pasted from apache/flink. And we will 
likely remove them from Flink ML in the future after we optimize the cogroup 
API in apache/flink. And I am not sure the amount of work described above will 
be useful later.
   
   Given that the overhead of DataStream is roughly 12.3% more than than 
DataSet, and the performance difference will likely be negligible for 
non-trivial algorithms such as ALS, I find it reasonable to just use the 
StreamRecord here as the short term approach and spend time on the long term 
approach (i.e. optimizing apache/flink).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


lindong28 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163450592


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.datastream.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * An operator that implements the co-group logic.
+ *
+ * @param  The class type of the first input.
+ * @param  The class type of the second input.
+ * @param  The class type of the key.
+ * @param  The class type of the output values.
+ */
+public class CoGroupOperator
+extends AbstractUdfStreamOperator>
+implements TwoInputStreamOperator, BoundedMultiInput {
+
+private PushSorter>> sorterA;
+private PushSorter>> sorterB;
+private TypeComparator>> comparatorA;
+private TypeComparator>> comparatorB;
+private KeySelector keySelectorA;
+private KeySelector keySelectorB;
+private TypeSerializer>> 
keyAndValueSerializerA;
+private TypeSerializer>> 
keyAndValueSerializerB;
+private TypeSerializer keySerializer;
+private DataOutputSerializer dataOutputSerializer;
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+private int remainingInputNum = 2;
+
+public CoGroupOperator(CoGroupFunction function) {
+super(function);
+}
+
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+ClassLoader userCodeClassLoader = 
containingTask.getUserCodeClassLoader();
+MemoryManager memoryManager = 
containingTask.getEnvironment().getMemoryManager();
+IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+keySelectorA = config.getStatePartitioner(0, userCodeClassLoader);
+keySelectorB = config.getStatePartitioner(1, userCodeClassLoader);
+keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+int keyLength = keySerializer.getLength();
+
+TypeSerializer typeSerializerA = config.getTypeSerializerIn(0, 

[GitHub] [flink-ml] lindong28 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


lindong28 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163449957


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.datastream.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * An operator that implements the co-group logic.
+ *
+ * @param  The class type of the first input.
+ * @param  The class type of the second input.
+ * @param  The class type of the key.
+ * @param  The class type of the output values.
+ */
+public class CoGroupOperator
+extends AbstractUdfStreamOperator>
+implements TwoInputStreamOperator, BoundedMultiInput {
+
+private PushSorter>> sorterA;
+private PushSorter>> sorterB;
+private TypeComparator>> comparatorA;
+private TypeComparator>> comparatorB;
+private KeySelector keySelectorA;
+private KeySelector keySelectorB;
+private TypeSerializer>> 
keyAndValueSerializerA;
+private TypeSerializer>> 
keyAndValueSerializerB;
+private TypeSerializer keySerializer;
+private DataOutputSerializer dataOutputSerializer;
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+private int remainingInputNum = 2;
+
+public CoGroupOperator(CoGroupFunction function) {
+super(function);
+}
+
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+ClassLoader userCodeClassLoader = 
containingTask.getUserCodeClassLoader();
+MemoryManager memoryManager = 
containingTask.getEnvironment().getMemoryManager();
+IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+keySelectorA = config.getStatePartitioner(0, userCodeClassLoader);
+keySelectorB = config.getStatePartitioner(1, userCodeClassLoader);
+keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+int keyLength = keySerializer.getLength();
+
+TypeSerializer typeSerializerA = config.getTypeSerializerIn(0, 

[GitHub] [flink-connector-kafka] tzulitai commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1504292430

   Merged via 
https://github.com/apache/flink-connector-kafka/commit/a7785630e714af303b224c38d9a6caa89a551265
   
   I squashed my changes along with your comment. Please let me know if you 
find anything odd and would like me to address. Thanks again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai closed pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai closed pull request #18: [FLINK-31049] [flink-connector-kafka]Add 
support for Kafka record headers to KafkaSink
URL: https://github.com/apache/flink-connector-kafka/pull/18


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1504282565

   @AlexAxeman thanks a lot for your contribution! +1 LGTM.
   
   Since there's other work that is sort of depending on this feature, I'll 
proceed to merge this PR and address my own comments while merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28171) Adjust Job and Task manager port definitions to work with Istio+mTLS

2023-04-11 Thread Sergio Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710375#comment-17710375
 ] 

Sergio Sainz edited comment on FLINK-28171 at 4/11/23 11:46 PM:


Hello [~wangyang0918] ! also experiencing this issue with native kubernetes 
deployment with HA enabled (not in flink k8s operator).

In [https://lists.apache.org/thread/yl40s9069wksz66qlf9t6jhmwsn59zft] you 
mentioned that "If HA enabled, the internal jobmanager rpc service will not be 
created. Instead, the TaskManager retrieves the JobManager address via HA 
services and connects to it via pod ip."

Do you know whether we can change the way TaskManager connects to HA services: 
do not use ip address and instead use pod name?

I think we cannot add the akka workaround of bypassing the istio sidecar. 
Alternatively, do you know how to add TLS encryption to this channel 
(TaskManager->HA service) manually? Thanks for the info ~

 


was (Author: sergiosp):
Hello [~wangyang0918] ! also experiencing this issue with native kubernetes 
deployment with HA enabled (not in flink k8s operator).

In [https://lists.apache.org/thread/yl40s9069wksz66qlf9t6jhmwsn59zft] you 
mentioned that "If HA enabled, the internal jobmanager rpc service will not be 
created. Instead, the TaskManager retrieves the JobManager address via HA 
services and connects to it via pod ip."

Do you know whether we can change the way TaskManager connects to HA services: 
do not use ip address and instead use pod name?

I think we cannot add the akka workaround of bypassing the istio sidecar. 
Thanks for the info ~

 

> Adjust Job and Task manager port definitions to work with Istio+mTLS
> 
>
> Key: FLINK-28171
> URL: https://issues.apache.org/jira/browse/FLINK-28171
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.14.4
> Environment: flink-kubernetes-operator 1.0.0
> Flink 1.14-java11
> Kubernetes v1.19.5
> Istio 1.7.6
>Reporter: Moshe Elisha
>Assignee: Moshe Elisha
>Priority: Major
>  Labels: pull-request-available
>
> Hello,
>  
> We are launching Flink deployments using the [Flink Kubernetes 
> Operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/]
>  on a Kubernetes cluster with Istio and mTLS enabled.
>  
> We found that the TaskManager is unable to communicate with the JobManager on 
> the jobmanager-rpc port:
>  
> {{2022-06-15 15:25:40,508 WARN  akka.remote.ReliableDeliverySupervisor        
>                [] - Association with remote system 
> [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]
>  has failed, address is now gated for [50] ms. Reason: [Association failed 
> with 
> [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]]
>  Caused by: [The remote system explicitly disassociated (reason unknown).]}}
>  
> The reason for the issue is that the JobManager service port definitions are 
> not following the Istio guidelines 
> [https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/]
>  (see example below).
>  
> There was also an email discussion around this topic in the users mailing 
> group under the subject "Flink Kubernetes Operator with K8S + Istio + mTLS - 
> port definitions".
> With the help of the community, we were able to work around the issue but it 
> was very hard and forced us to skip Istio proxy which is not ideal.
>  
> We would like you to consider changing the default port definitions, either
>  # Rename the ports – I understand it is Istio specific guideline but maybe 
> it is better to at least be aligned with one (popular) vendor guideline 
> instead of none at all.
>  # Add the “appProtocol” property[1] that is not specific to any vendor but 
> requires Kubernetes >= 1.19 where it was introduced as beta and moved to 
> stable in >= 1.20. The option to add appProtocol property was added only in 
> [https://github.com/fabric8io/kubernetes-client/releases/tag/v5.10.0] with 
> [#3570|https://github.com/fabric8io/kubernetes-client/issues/3570].
>  # Or allow a way to override the defaults.
>  
> [https://kubernetes.io/docs/concepts/services-networking/_print/#application-protocol]
>  
>  
> {{# k get service inference-results-to-analytics-engine -o yaml}}
> {{apiVersion: v1}}
> {{kind: Service}}
> {{...}}
> {{spec:}}
> {{  clusterIP: None}}
> {{  ports:}}
> {{  - name: jobmanager-rpc *# should start with “tcp-“ or add "appProtocol" 
> property*}}
> {{    port: 6123}}
> {{    protocol: TCP}}
> {{    targetPort: 6123}}
> {{  - name: blobserver *# should start with 

[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163427396


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##
@@ -86,6 +89,16 @@ public void testDoNotAllowMultipleTopicSelector() {
 .isInstanceOf(IllegalStateException.class);
 }
 
+@Test
+public void testDoNotAllowMultipleHeaderProducers() {

Review Comment:
   I don't think you really need this restriction for the headers.
   
   The key / value serializer setters had the check because there were 
different entry points for setting the serialization schema, i.e. either via 
`setKafkaXSerializer` or `setKeySerializationSchema`. We don't want the user to 
be setting the key serializer through both entrypoints, hence the check.
   
   This isn't a problem for the header provider setting. Same goes for the 
Kafka partitioner, for example - we don't check multiple sets there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-04-11 Thread via GitHub


rkhachatryan commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163352135


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -980,8 +963,7 @@ protected CompletableFuture getCompletionFuture() {
 
 @Override
 public final void cancel() throws Exception {
-isRunning = false;
-canceled = true;
+taskState.canceled = true;

Review Comment:
   Shouldn't we also update `taskState.status` here?
   
   (ideally, in my view we should use either enum or a set of flags inside the 
`TaskState`, but not both)



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -1797,4 +1779,74 @@ public interface CanEmitBatchOfRecordsChecker {
 
 boolean check();
 }
+
+/** Possible states of a Task. */
+private static class TaskState {
+/**
+ * An enumeration of all states that a task can be in during its 
execution. Transitions
+ * usually follow the diagram bellow:
+ *
+ * {@code
+ * INITIALIZED  -> RESTORING -> RUNNING -> FINISHED
+ * | |   |
+ * | |   |
+ * | V   |
+ * |   CANCELED -+

Review Comment:
   `CANCELED` state was removed from the code.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -1797,4 +1779,74 @@ public interface CanEmitBatchOfRecordsChecker {
 
 boolean check();
 }
+
+/** Possible states of a Task. */
+private static class TaskState {
+/**
+ * An enumeration of all states that a task can be in during its 
execution. Transitions
+ * usually follow the diagram bellow:
+ *
+ * {@code
+ * INITIALIZED  -> RESTORING -> RUNNING -> FINISHED
+ * | |   |
+ * | |   |
+ * | V   |
+ * |   CANCELED -+
+ * ||
+ * ++
+ *
+ * }
+ *
+ * Task enters {@code RESTORING} status before {@code RUNNING} to 
restore an invokable
+ * object from the last valid state, if any.
+ *
+ * Task enters {@code FINISHED} status through cleanup method, 
regardless of
+ * cancellations or if the previous call succeeded.
+ *
+ * It is possible for a Task to be {@code failing} while being in 
any status e.g, failing
+ * == true while status is RUNNING
+ */
+private enum Status {
+/** Task has successfully terminated. */
+FINISHED(),
+/** The task is "in operation". */
+RUNNING(FINISHED),
+/** The task is restoring during {@link #restore()}. */
+RESTORING(RUNNING, FINISHED),
+/** Task constructor was called on init state. */
+INITIALIZED(RESTORING);
+
+Status[] transitions;
+
+Status(Status... transitions) {
+this.transitions = transitions;
+}
+}
+
+/**
+ * Task is failing e.g., if an exception occurred inside {@link 
#invoke()}. Note that this
+ * can happen while Task is still in Running status.
+ */
+private volatile boolean failing = false;
+
+/**
+ * Task has been canceled. Note that can value can be true even while 
Task is still in
+ * Running status, e.g., canceled externally by TaskCanceler.
+ */
+private volatile boolean canceled = false;
+
+private volatile Status status = Status.INITIALIZED;
+
+final boolean transitionTo(Status newStatus) {
+return 
Arrays.stream(status.transitions).anyMatch(newStatus::equals);
+}

Review Comment:
   According to this method name and usages, should it actually update the 
state? (rather than just returning boolean)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21833: [FLINK-30852][runtime] Checking task cancelation explicitly rather than failing the cancel method

2023-04-11 Thread via GitHub


rkhachatryan commented on code in PR #21833:
URL: https://github.com/apache/flink/pull/21833#discussion_r1163338534


##
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##
@@ -1300,9 +1304,7 @@ public TestInvokableCorrect(Environment environment) {
 public void invoke() {}
 
 @Override
-public void cancel() {
-fail("This should not be called");

Review Comment:
   nit: although this won't cause test failure, I think logging (unexpected) 
cancellation makes debugging and reading the code a bit easier



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #563: [FLINK-31716] Event UID field is missing the first time that an event…

2023-04-11 Thread via GitHub


morhidi commented on PR #563:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/563#issuecomment-1504075103

   +1 LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22377: [FLINK-31765][runtime][test] Disable changelog backend for RocksDB migration tests

2023-04-11 Thread via GitHub


rkhachatryan commented on code in PR #22377:
URL: https://github.com/apache/flink/pull/22377#discussion_r1163316537


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##
@@ -136,6 +136,9 @@ public void testSavepoint() throws Exception {
 switch (snapshotSpec.getStateBackendType()) {
 case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
 env.setStateBackend(new EmbeddedRocksDBStateBackend());
+// disable changelog backend for now to ensure determinism in 
test data generation
+// (see FLINK-31766)
+env.enableChangelogStateBackend(false);

Review Comment:
   nit: this change duplicates line 155, one can be removed
   
   nit: can be scoped to `executionMode == ExecutionMode.CREATE_SNAPSHOT` only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #563: [FLINK-31716] Event UID field is missing the first time that an event…

2023-04-11 Thread via GitHub


morhidi commented on code in PR #563:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/563#discussion_r1163279331


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java:
##
@@ -72,6 +72,7 @@ public void accept(Event event) {
 .withName(eventName)
 .get();
 Assertions.assertEquals(event.getMetadata().getUid(), 
eventConsumed.getMetadata().getUid());

Review Comment:
   This is redundant with the assertions below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on pull request #18: [hotfix] Use 1.0.0-1.16 baseline for API compatibility checks

2023-04-11 Thread via GitHub


reta commented on PR #18:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/18#issuecomment-1504021457

   @MartijnVisser if you have time, super small one, thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] rodmeneses opened a new pull request, #563: [FLINK-31716] Event UID field is missing the first time that an event…

2023-04-11 Thread via GitHub


rodmeneses opened a new pull request, #563:
URL: https://github.com/apache/flink-kubernetes-operator/pull/563

   
   What is the purpose of the change
   
   Fixes a bug reported on https://issues.apache.org/jira/browse/FLINK-31716 
where the event being consumed for the first time didn't have an UID field
   
   This ticket was reopened for two reasons:
   1. Before the check was only being done when the event was created for the 
first time, and not on updates
   2. We are now accepting the actual return value of the `.createOrReplace` 
call. Before we were only using the `uid` field of it.
   
   Brief change log
   
   Fixes a bug on EventUtils where the event consumed was missing the UID field
   Verifying this change
   
   This change modified existing unit tests.
   
   Does this pull request potentially affect one of the following parts:
   
   Dependencies (does it add or upgrade a dependency): no
   The public API, i.e., is any changes to the CustomResourceDescriptors: no
   Core observer or reconciler logic that is regularly executed: no
   Documentation
   
   Does this pull request introduce a new feature? no
   If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters

2023-04-11 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711097#comment-17711097
 ] 

Roman Khachatryan commented on FLINK-31743:
---

[~assassinj] are you still planning to work on this issue?

> Avoid relocating the RocksDB's log failure when filename exceeds 255 
> characters
> ---
>
> Key: FLINK-31743
> URL: https://issues.apache.org/jira/browse/FLINK-31743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1, 1.15.4
>Reporter: jinghaihang
>Assignee: jinghaihang
>Priority: Major
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing 
> the db path, when the db path is long and the filename exceeds 255 
> characters, the creation of the file will fail, so the relevant rocksdb LOG 
> cannot be seen in the flink log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled

2023-04-11 Thread Sergio Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Sainz updated FLINK-31775:
-
Description: 
When using native kubernetes deployment mode with high-availability (HA), and 
when new TaskManager pod is started to process a job, the TaskManager pod will 
attempt to register itself to the resource manager (JobManager). the 
TaskManager looks up the resource manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator and this is about native 
kubernetes deployment.

 

  was:
When using native kubernetes deployment mode with high-availability, and when 
new TaskManager pod is started to process a job, the TaskManager pod will 
attempt to register itself to the resource manager (JobManager). the 
TaskManager looks up the resource manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator and this is about native 
kubernetes deployment.

 


> High-Availability not supported in kubernetes when istio enabled
> 
>
> Key: FLINK-31775
> URL: https://issues.apache.org/jira/browse/FLINK-31775
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When using native kubernetes deployment mode with high-availability (HA), and 
> when new TaskManager pod is started to process a job, the TaskManager pod 
> will attempt to register itself to the resource manager (JobManager). the 
> TaskManager looks up the resource manager per ip-address 
> (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)
>  
> Nevertheless when istio is enabled, the resolution by ip address is blocked, 
> and hence we see that the job cannot start because task manager cannot 
> register with the resource manager:
> 2023-04-10 23:24:19,752 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
> 1 ms: Could not connect to rpc endpoint under address 
> 

[jira] [Reopened] (FLINK-31716) Event UID field is missing the first time that an event is consumed

2023-04-11 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reopened FLINK-31716:


> Event UID field is missing the first time that an event is consumed
> ---
>
> Key: FLINK-31716
> URL: https://issues.apache.org/jira/browse/FLINK-31716
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Rodrigo Meneses
>Assignee: Rodrigo Meneses
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
>
> on `EventUtils.createOrUpdateEvent` we use a `Consumer` instance to 
> `accept` the underlying event that is being created or updated.
> The first time an event is created, we are calling 
> `client.resource(event).createOrReplace()` but we are discarding the return 
> value of such method, and we are returning the `event` that we just created, 
> which has an empty UID field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled

2023-04-11 Thread Sergio Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Sainz updated FLINK-31775:
-
Description: 
When using native kubernetes deployment mode with high-availability, and when 
new TaskManager pod is started to process a job, the TaskManager pod will 
attempt to register itself to the resource manager (JobManager). the 
TaskManager looks up the resource manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator and this is about native 
kubernetes deployment.

 

  was:
When using native kubernetes deployment mode, and when new TaskManager pod is 
started to process a job, the TaskManager pod will attempt to register itself 
to the resource manager (JobManager). the TaskManager looks up the resource 
manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator and this is about native 
kubernetes deployment.

 


> High-Availability not supported in kubernetes when istio enabled
> 
>
> Key: FLINK-31775
> URL: https://issues.apache.org/jira/browse/FLINK-31775
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When using native kubernetes deployment mode with high-availability, and when 
> new TaskManager pod is started to process a job, the TaskManager pod will 
> attempt to register itself to the resource manager (JobManager). the 
> TaskManager looks up the resource manager per ip-address 
> (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)
>  
> Nevertheless when istio is enabled, the resolution by ip address is blocked, 
> and hence we see that the job cannot start because task manager cannot 
> register with the resource manager:
> 2023-04-10 23:24:19,752 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
> 1 ms: Could not connect to rpc endpoint under address 
> 

[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled

2023-04-11 Thread Sergio Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Sainz updated FLINK-31775:
-
Description: 
When using native kubernetes deployment mode, and when new TaskManager pod is 
started to process a job, the TaskManager pod will attempt to register itself 
to the resource manager (JobManager). the TaskManager looks up the resource 
manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator and this is about native 
kubernetes deployment.

 

  was:
When using native kubernetes deployment mode, and when new TaskManager pod is 
started to process a job, the TaskManager pod will attempt to register itself 
to the resource manager (JobManager). the TaskManager looks up the resource 
manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator.

 


> High-Availability not supported in kubernetes when istio enabled
> 
>
> Key: FLINK-31775
> URL: https://issues.apache.org/jira/browse/FLINK-31775
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When using native kubernetes deployment mode, and when new TaskManager pod is 
> started to process a job, the TaskManager pod will attempt to register itself 
> to the resource manager (JobManager). the TaskManager looks up the resource 
> manager per ip-address 
> (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)
>  
> Nevertheless when istio is enabled, the resolution by ip address is blocked, 
> and hence we see that the job cannot start because task manager cannot 
> register with the resource manager:
> 2023-04-10 23:24:19,752 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
> 1 ms: Could not connect to rpc endpoint under address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.
>  
> Notice that when HA is disabled, the resolution of the resource manager 

[jira] [Updated] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled

2023-04-11 Thread Sergio Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Sainz updated FLINK-31775:
-
Description: 
When using native kubernetes deployment mode, and when new TaskManager pod is 
started to process a job, the TaskManager pod will attempt to register itself 
to the resource manager (JobManager). the TaskManager looks up the resource 
manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://flink@myenv-dev-flink-cluster.myenv-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice in my case , it is not possible to disable istio as explained here: 
[https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html]

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator.

 

  was:
When using native kubernetes deployment mode, and when new TaskManager is 
started to process a job, the TaskManager will attempt to register itself to 
the resource manager (job manager). the TaskManager looks up the resource 
manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://fl...@local-mci-ar32a-dev-flink-cluster.mstr-env-mci-ar32a-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice it is not possible to disable istio (as explained here : 
https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html)

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator.


> High-Availability not supported in kubernetes when istio enabled
> 
>
> Key: FLINK-31775
> URL: https://issues.apache.org/jira/browse/FLINK-31775
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When using native kubernetes deployment mode, and when new TaskManager pod is 
> started to process a job, the TaskManager pod will attempt to register itself 
> to the resource manager (JobManager). the TaskManager looks up the resource 
> manager per ip-address 
> (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)
>  
> Nevertheless when istio is enabled, the resolution by ip address is blocked, 
> and hence we see that the job cannot start because task manager cannot 
> register with the resource manager:
> 2023-04-10 23:24:19,752 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
> 1 ms: Could not connect to rpc endpoint under address 
> akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.
>  
> Notice that when HA is disabled, the resolution of the resource manager is 
> made by service name and so the resource 

[jira] [Created] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled

2023-04-11 Thread Sergio Sainz (Jira)
Sergio Sainz created FLINK-31775:


 Summary: High-Availability not supported in kubernetes when istio 
enabled
 Key: FLINK-31775
 URL: https://issues.apache.org/jira/browse/FLINK-31775
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.16.1
Reporter: Sergio Sainz


When using native kubernetes deployment mode, and when new TaskManager is 
started to process a job, the TaskManager will attempt to register itself to 
the resource manager (job manager). the TaskManager looks up the resource 
manager per ip-address 
(akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1)

 

Nevertheless when istio is enabled, the resolution by ip address is blocked, 
and hence we see that the job cannot start because task manager cannot register 
with the resource manager:

2023-04-10 23:24:19,752 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1.

 

Notice that when HA is disabled, the resolution of the resource manager is made 
by service name and so the resource manager can be found

 

2023-04-11 00:49:34,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://fl...@local-mci-ar32a-dev-flink-cluster.mstr-env-mci-ar32a-dev:6123/user/rpc/resourcemanager_*
 under registration id 83ad942597f86aa880ee96f1c2b8b923.

 

Notice it is not possible to disable istio (as explained here : 
https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html)

 

Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging 
as separate defect as I believe the fix of FLINK-28171 won't fix this case. 
FLINK-28171  is about Flink Kubernetes Operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser closed pull request #22371: [FLINK-28171] [flink-kubernates] enable add appProtocol via the configuration and verify it is not overridden by Default port defintion

2023-04-11 Thread via GitHub


MartijnVisser closed pull request #22371: [FLINK-28171] [flink-kubernates] 
enable add appProtocol via the configuration and verify it is not overridden by 
Default port defintion
URL: https://github.com/apache/flink/pull/22371


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] MartijnVisser commented on pull request #59: [FLINK-31696] ElasticSearch nightly CI failure

2023-04-11 Thread via GitHub


MartijnVisser commented on PR #59:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/59#issuecomment-1503913242

   Nightly triggered at 
https://github.com/apache/flink-connector-elasticsearch/actions/runs/4670944068


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-17508) Develop Oracle Catalog

2023-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-17508:
--

Assignee: (was: RocMarshal)

> Develop Oracle Catalog
> --
>
> Key: FLINK-17508
> URL: https://issues.apache.org/jira/browse/FLINK-17508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Similarly to https://issues.apache.org/jira/browse/FLINK-16471



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31696) ElasticSearch nightly CI failure

2023-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-31696.
--
Fix Version/s: elasticsearch-4.0.0
   Resolution: Fixed

Fixed in main: 85f0f4057a6b241a7d9b0ae2996ac1147e0e2428

> ElasticSearch nightly CI failure
> 
>
> Key: FLINK-31696
> URL: https://issues.apache.org/jira/browse/FLINK-31696
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Danny Cranmer
>Assignee: Andriy Redko
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-4.0.0
>
>
> Investigate and fix the nightly CI failure. Example 
> [https://github.com/apache/flink-connector-elasticsearch/actions/runs/4585918498/jobs/8098357503]
>  
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) 
> on project flink-connector-elasticsearch-base: Execution default-test of goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: 
> org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' 
> failed to discover tests: 'java.lang.Object 
> com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(java.lang.Class)'
>  -> [Help 1] {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-elasticsearch] MartijnVisser merged pull request #59: [FLINK-31696] ElasticSearch nightly CI failure

2023-04-11 Thread via GitHub


MartijnVisser merged PR #59:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/59


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] reta commented on pull request #59: [FLINK-31696] ElasticSearch nightly CI failure

2023-04-11 Thread via GitHub


reta commented on PR #59:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/59#issuecomment-1503855950

   > @reta Can you include setting the version for the connector to 
4.0-SNAPSHOT, given that we'll drop support for Flink 1.16 with this PR?
   
   @MartijnVisser it was done already, 
https://github.com/apache/flink-connector-elasticsearch/blob/main/pom.xml#L32 :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

2023-04-11 Thread via GitHub


flinkbot commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1503852904

   
   ## CI report:
   
   * d4ac5891e842a21920525a5ed5f0ccce5206718c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31763) Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

2023-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31763:
---
Labels: pull-request-available  (was: )

> Ensure that the total number of requested buffers does not exceed poolSize + 
> maxOverdraftBuffersPerGate
> ---
>
> Key: FLINK-31763
> URL: https://issues.apache.org/jira/browse/FLINK-31763
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>
> As we discussed in FLINK-31610, new buffers can be requested only when 
> "{_}numOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments < 
> poolSize + maxOverdraftBuffersPerGate"{_}.
> Consider such a scenario, the {{{}CurrentPoolSize = 5{}}}, 
> {{{}numOfRequestedMemorySegments = 7{}}}, {{{}maxOverdraftBuffersPerGate = 
> 2{}}}. If {{{}numberOfRequestedOverdraftMemorySegments = 0{}}}, then 2 
> buffers can be requested now. 
> We should convert {{numberOfRequestedMemorySegments}} to 
> {{numberOfRequestedOverdraftMemorySegments}} when poolSize is decreased.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa opened a new pull request, #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

2023-04-11 Thread via GitHub


reswqa opened a new pull request, #22381:
URL: https://github.com/apache/flink/pull/22381

   ## What is the purpose of the change
   
   *As we discussed in 
[FLINK-31610](https://issues.apache.org/jira/browse/FLINK-31610), new buffers 
can be requested only when `numOfRequestedMemorySegments + 
numberOfRequestedOverdraftMemorySegments < poolSize + 
maxOverdraftBuffersPerGate`.*
   
   *Consider such a scenario, the `CurrentPoolSize = 5`, 
`numOfRequestedMemorySegments = 7`, `maxOverdraftBuffersPerGate = 2`. If 
`numberOfRequestedOverdraftMemorySegments = 0`, then `2` buffers can be 
requested now.*
   
   *We should convert `numberOfRequestedMemorySegments` to 
`numberOfRequestedOverdraftMemorySegments` when poolSize is decreased.*
   
   
   ## Brief change log
   
 - *Convert `numberOfRequestedMemorySegments` to 
`numberOfRequestedOverdraftMemorySegments` when poolSize is decreased.*
   
   
   ## Verifying this change
   
   Add unit test in `LocalBufferPoolTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes(but 
per-buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163104861


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   @AlexAxeman see my latest comment above about the default case being 
`headerProvider` is null.
   
   In that case, we no longer need / want to return `Optional`.
   
   i.e.
   
   ```
   interface HeaderProvider {
   Headers getHeaders(IN input); // no default implementation
   }
   ```
   
   and in `KafkaRecordSerializationSchemaWrapper#serialize()`:
   ```
   if (headerProvider != null) {
   return new ProducerRecord(
 ...,
 headerProvider.getHeaders(input)
   )
   } else (
   return new ProducerRecord() // no headers
   )
   ```



##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   @AlexAxeman see my latest comment above about the default case being 
`headerProvider` is null.
   
   In that case, we no longer need / want to return `Optional`.
   
   i.e.
   
   ```
   interface HeaderProvider {
   Headers getHeaders(IN input); // no default implementation
   }
   ```
   
   and in `KafkaRecordSerializationSchemaWrapper#serialize()`:
   ```
   if (headerProvider != null) {
   return new ProducerRecord(
 ...,
 headerProvider.getHeaders(input)
   )
   } else (
   return new ProducerRecord(...) // no headers
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163104861


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   @AlexAxeman see my latest comment above about the default case being 
`headerProvider` is null.
   
   In that case, we no longer need / want to return `Optional`.
   
   i.e.
   
   ```
   interface HeaderProvider {
   Headers getHeaders(IN input);
   }
   ```
   
   and in `KafkaRecordSerializationSchemaWrapper#serialize()`:
   ```
   if (headerProvider != null) {
   return new ProducerRecord(
 ...,
 headerProvider.getHeaders(input)
   )
   } else (
   return new ProducerRecord() // no headers
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


AlexAxeman commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163100734


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   Just reading up on your second comment. Yes, I think that would be better 
than. The RecordProducer also handles this gracefully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


AlexAxeman commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163098741


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   Hm... Personally I don't really like that. The big thing about Optionals is 
to force the handling of possible `null` values. You don't need that protecting 
while returning collections, as you can always return an empty one, which is 
what I did. I'd much rather go for an empty (immutable) collection.
   
   Initially, I returned a `Iterable` and changed that to kafka API 
Headers upon request. I'm not really leaning to either side, so happy to change 
that back @RamanVerma, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163096185


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   Actually - on second thought, the default behavior / case can simply be that 
`headerProvider` is `null`. For example, `keySerializer` is nullable for a 
`KafkaRecordSerializationSchema` - the same can apply for the `headerProvider`. 
In this case, we don't need a default implementation for the 
`HeaderProvider#getProvider` method any more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mohsenrezaeithe commented on pull request #22302: [hotfix] [docs] Fix `ROW()` decleration syntax

2023-04-11 Thread via GitHub


mohsenrezaeithe commented on PR #22302:
URL: https://github.com/apache/flink/pull/22302#issuecomment-1503771608

   @wuchong could you please approve this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta opened a new pull request, #18: [hotfix] Use 1.0.0-1.16 baseline for API compatibility checks

2023-04-11 Thread via GitHub


reta opened a new pull request, #18:
URL: https://github.com/apache/flink-connector-opensearch/pull/18

   Use 1.0.0-1.16 baseline for API compatibility checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163083277


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   I was thinking about `Optional getHeaders(IN input)`, and default 
is returning `Optional.empty()`.
   
   I'm also wondering if it's possible / makes sense to NOT expose the Kafka 
`Headers` class on the public API. Although Kafka API seems to be rather 
stable, in general its best if we can avoid exposing our dependencies' classes 
on the public interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31696) ElasticSearch nightly CI failure

2023-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31696:
---
Labels: pull-request-available  (was: )

> ElasticSearch nightly CI failure
> 
>
> Key: FLINK-31696
> URL: https://issues.apache.org/jira/browse/FLINK-31696
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Danny Cranmer
>Assignee: Andriy Redko
>Priority: Major
>  Labels: pull-request-available
>
> Investigate and fix the nightly CI failure. Example 
> [https://github.com/apache/flink-connector-elasticsearch/actions/runs/4585918498/jobs/8098357503]
>  
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) 
> on project flink-connector-elasticsearch-base: Execution default-test of goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: 
> org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' 
> failed to discover tests: 'java.lang.Object 
> com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(java.lang.Class)'
>  -> [Help 1] {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-elasticsearch] reta opened a new pull request, #59: [FLINK-31696] ElasticSearch nightly CI failure

2023-04-11 Thread via GitHub


reta opened a new pull request, #59:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/59

   ElasticSearch nightly CI failure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


reta commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1163061352


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   No worries at all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-18027) ROW value constructor cannot deal with complex expressions

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711040#comment-17711040
 ] 

Aitozi commented on FLINK-18027:


[~libenchao] do you know why the explicit ROW construct is removed from the 
doc, (the git history of the file seems not trackable)

Does it's not encouraged to use explicit ROW call ? If not, I think we should 
add it back.

> ROW value constructor cannot deal with complex expressions
> --
>
> Key: FLINK-18027
> URL: https://issues.apache.org/jira/browse/FLINK-18027
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Benchao Li
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> {code:java}
> create table my_source (
> my_row row
> ) with (...);
> create table my_sink (
> my_row row
> ) with (...);
> insert into my_sink
> select ROW(my_row.a, my_row.b) 
> from my_source;{code}
> will throw excepions:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
> parse failed. Encountered "." at line 1, column 18.Exception in thread "main" 
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
> "." at line 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:627)
>  at com.bytedance.demo.KafkaTableSource.main(KafkaTableSource.java:76)Caused 
> by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 
> 1, column 18.Was expecting one of:    ")" ...    "," ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
>  at 
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) 
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) at 
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
>  ... 3 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: 
> Encountered "." at line 1, column 18.Was expecting one of:    ")" ...    "," 
> ...     at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36161)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35975)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21432)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:17164)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:16820)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16861)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16792)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11091)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10293)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10267)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6943)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16775)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16238)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
>  at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
>  at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) 
> ... 5 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-17508) Develop Oracle Catalog

2023-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-17508:
---
Summary: Develop Oracle Catalog  (was: Develop OracleCatalog)

> Develop Oracle Catalog
> --
>
> Key: FLINK-17508
> URL: https://issues.apache.org/jira/browse/FLINK-17508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: Flavio Pompermaier
>Assignee: RocMarshal
>Priority: Major
>
> Similarly to https://issues.apache.org/jira/browse/FLINK-16471



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29672) Support oracle catalog

2023-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-29672.
--
Resolution: Duplicate

> Support oracle catalog 
> ---
>
> Key: FLINK-29672
> URL: https://issues.apache.org/jira/browse/FLINK-29672
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.16.0
>Reporter: waywtdcc
>Priority: Major
>
> Support oracle catalog 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-shaded] MartijnVisser commented on a diff in pull request #120: [FLINK-31719][Build/Shaded] Update Netty to 4.1.91.Final

2023-04-11 Thread via GitHub


MartijnVisser commented on code in PR #120:
URL: https://github.com/apache/flink-shaded/pull/120#discussion_r1163023881


##
pom.xml:
##
@@ -355,6 +355,20 @@ under the License.
 
 
 
+
+
+org.cyclonedx
+cyclonedx-maven-plugin

Review Comment:
   Yeah this shouldn't have been here. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


tzulitai commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163021963


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {

Review Comment:
   > it does not assign the headers, but produces them
   
   In that case, how about `HeaderProvider`? `Provider` seems to be a more 
canonical name for this.
   
   And correspondingly, the single method would be called `getHeaders(IN 
input)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711033#comment-17711033
 ] 

Aitozi commented on FLINK-31774:


I think we should add the description for this two statements similar to the 
INSERT statement. CC [~luoyuxia] I'm willing to work on this.

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31774:
---
Parent: FLINK-30648
Issue Type: Sub-task  (was: Improvement)

> Add document for delete and update statement
> 
>
> Key: FLINK-31774
> URL: https://issues.apache.org/jira/browse/FLINK-31774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> I do not find the declaration about the usage of DELETE and UPDATE statement 
> in the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] MartijnVisser commented on pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


MartijnVisser commented on PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#issuecomment-1503642528

   > may I ask you please to trigger nightly action manually
   
   https://github.com/apache/flink-connector-opensearch/actions/runs/4669419298 
is started!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] MartijnVisser commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


MartijnVisser commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1163012687


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   > I'll fix this afterwards :)
   
   I'm an idiot and got confused by Flink 1.16.1, there's no Flink 1.17.1 yet  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)
Aitozi created FLINK-31774:
--

 Summary: Add document for delete and update statement
 Key: FLINK-31774
 URL: https://issues.apache.org/jira/browse/FLINK-31774
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi


I do not find the declaration about the usage of DELETE and UPDATE statement in 
the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31760) COALESCE() with NULL arguments throws error

2023-04-11 Thread Mohsen Rezaei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711022#comment-17711022
 ] 

Mohsen Rezaei commented on FLINK-31760:
---

[~lincoln.86xy], if I understand correctly, the error message is a generic SQL 
validation thrown for similar uses in the SQL, e.g. 
[{{testValuesWithoutTypeCoercion}}|https://github.com/mohsenrezaeithe/flink/blob/7dd42cf22e85ce2e9dbbde5210edd65bcb94f459/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/NullTypeTest.java#L47-L47].

Are you suggesting to improve the error at the root, or wrap it somewhere in 
the stack and add a better message?

I'll also go ahead and add the {{CAST(NULL AS VARCHAR)}} note in the docs for 
clarity.

> COALESCE() with NULL arguments throws error
> ---
>
> Key: FLINK-31760
> URL: https://issues.apache.org/jira/browse/FLINK-31760
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
> Environment: Flink 1.16.1
>Reporter: Mohsen Rezaei
>Priority: Major
>
> All arguments may not be nullable:
> {code}
> SELECT COALESCE(NULL, NULL)  FROM UnnamedTable$0
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 17 to line 1, column 20: Illegal 
> use of 'NULL'
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
>   at CoalesceTest.main(CoalesceTest.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 17 to line 1, column 20: Illegal use of 'NULL'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1837)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1912)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:419)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
>   ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use 
> of 'NULL'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> 

[GitHub] [flink-connector-opensearch] reta commented on pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


reta commented on PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#issuecomment-1503618001

   > LGTM
   
   Thanks a lot, may I ask you please to trigger nightly action manually when 
you have an opportunity, thank you (once it is green, I will replicate changes 
to Elasticsearch connector)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] MartijnVisser merged pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


MartijnVisser merged PR #16:
URL: https://github.com/apache/flink-connector-opensearch/pull/16


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] MartijnVisser commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


MartijnVisser commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162981670


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   I'll fix this afterwards :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31760) COALESCE() with NULL arguments throws error

2023-04-11 Thread Mohsen Rezaei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711014#comment-17711014
 ] 

Mohsen Rezaei commented on FLINK-31760:
---

Hi [~lemonjing], thanks for the quick response.

I'm interested in providing a fix, but I need to dig a bit and see how to best 
address the issue, and familiarize myself with that part of the code. Let me 
know if you have any pointers.

> COALESCE() with NULL arguments throws error
> ---
>
> Key: FLINK-31760
> URL: https://issues.apache.org/jira/browse/FLINK-31760
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
> Environment: Flink 1.16.1
>Reporter: Mohsen Rezaei
>Priority: Major
>
> All arguments may not be nullable:
> {code}
> SELECT COALESCE(NULL, NULL)  FROM UnnamedTable$0
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 17 to line 1, column 20: Illegal 
> use of 'NULL'
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
>   at CoalesceTest.main(CoalesceTest.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 17 to line 1, column 20: Illegal use of 'NULL'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1837)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1912)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:419)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
>   ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use 
> of 'NULL'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>   ... 21 more
> {code}
> As 
> 

[jira] (FLINK-31760) COALESCE() with NULL arguments throws error

2023-04-11 Thread Mohsen Rezaei (Jira)


[ https://issues.apache.org/jira/browse/FLINK-31760 ]


Mohsen Rezaei deleted comment on FLINK-31760:
---

was (Author: JIRAUSER298550):
Hi [~lemonjing], thanks for the quick response.

I'm interested in providing a fix, but I need to dig a bit and see how to best 
address the issue, and familiarize myself with that part of the code. Let me 
know if you have any pointers.

> COALESCE() with NULL arguments throws error
> ---
>
> Key: FLINK-31760
> URL: https://issues.apache.org/jira/browse/FLINK-31760
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
> Environment: Flink 1.16.1
>Reporter: Mohsen Rezaei
>Priority: Major
>
> All arguments may not be nullable:
> {code}
> SELECT COALESCE(NULL, NULL)  FROM UnnamedTable$0
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 17 to line 1, column 20: Illegal 
> use of 'NULL'
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
>   at CoalesceTest.main(CoalesceTest.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 17 to line 1, column 20: Illegal use of 'NULL'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1837)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1912)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:419)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
>   ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use 
> of 'NULL'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>   ... 21 more
> {code}
> As 
> 

[jira] [Commented] (FLINK-31533) CREATE TABLE AS SELECT should support to define partition

2023-04-11 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711002#comment-17711002
 ] 

Aitozi commented on FLINK-31533:


[~luoyuxia] I think it's a useful feature, and I'd like to support this, can I 
take this ticket ?

> CREATE TABLE AS SELECT should support to define partition
> -
>
> Key: FLINK-31533
> URL: https://issues.apache.org/jira/browse/FLINK-31533
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] godfreyhe commented on a diff in pull request #22376: [FLINK-31767][table-planner] Improve the implementation for "analyze table" execution on partitioned table

2023-04-11 Thread via GitHub


godfreyhe commented on code in PR #22376:
URL: https://github.com/apache/flink/pull/22376#discussion_r1162913583


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java:
##
@@ -137,13 +138,46 @@ private static CatalogColumnStatistics 
mergeColumnStatistics(
 if (!columns.isEmpty()) {
 columnStat = convertToColumnStatistics(row, columns);
 }
-return new Tuple2<>(tableStat, columnStat);
+return new StatisticsWrapper(tableStat, columnStat);
+}
+
+private static Map 
executeSqlAndGenerateStatisticsForMultiParts(
+TableEnvironmentImpl tableEnv,
+List columns,
+String statSql,
+int partitionCount) {
+TableResult tableResult = tableEnv.executeSql(statSql);

Review Comment:
   Thanks for the reminding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


AlexAxeman commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1162890942


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {
+default Headers produceHeaders(IN input) {
+return new RecordHeaders(new ArrayList<>());

Review Comment:
   Yeah, thought about that. So it should be a `Collections.emptyList()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

2023-04-11 Thread via GitHub


AlexAxeman commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1162889963


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer extends Serializable {

Review Comment:
   Strictly, it does not assign the headers, but produces them. If you're 
concerned about the naming conflict, how about HeaderCreator? (not that I like 
that very much...). I'm happy to rename it to HeaderAssigner otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] nikolasten commented on pull request #16: [BK-3.0][FLINK-30552][Connector/Pulsar] drop batch message size assertion, better set the cursor position.

2023-04-11 Thread via GitHub


nikolasten commented on PR #16:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/16#issuecomment-1503435012

   Disregard my latest comment. 
   When used with flink 1.17 works fine.
   
   If anybody else has a problem and waiting for release, we used snapshot 
version with flink 1.17.
   
   ```gradle 
   maven {
   url "https://repository.apache.org/content/repositories/snapshots;
   }
   implementation 
"org.apache.flink:flink-connector-pulsar:4.0-20230404.065406-2"
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-11 Thread via GitHub


zhipeng93 commented on code in PR #230:
URL: https://github.com/apache/flink-ml/pull/230#discussion_r1162850039


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.datastream.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * An operator that implements the co-group logic.
+ *
+ * @param  The class type of the first input.
+ * @param  The class type of the second input.
+ * @param  The class type of the key.
+ * @param  The class type of the output values.
+ */
+public class CoGroupOperator
+extends AbstractUdfStreamOperator>
+implements TwoInputStreamOperator, BoundedMultiInput {
+
+private PushSorter>> sorterA;
+private PushSorter>> sorterB;
+private TypeComparator>> comparatorA;
+private TypeComparator>> comparatorB;
+private KeySelector keySelectorA;
+private KeySelector keySelectorB;
+private TypeSerializer>> 
keyAndValueSerializerA;
+private TypeSerializer>> 
keyAndValueSerializerB;
+private TypeSerializer keySerializer;
+private DataOutputSerializer dataOutputSerializer;
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+private int remainingInputNum = 2;
+
+public CoGroupOperator(CoGroupFunction function) {
+super(function);
+}
+
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+ClassLoader userCodeClassLoader = 
containingTask.getUserCodeClassLoader();
+MemoryManager memoryManager = 
containingTask.getEnvironment().getMemoryManager();
+IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+keySelectorA = config.getStatePartitioner(0, userCodeClassLoader);
+keySelectorB = config.getStatePartitioner(1, userCodeClassLoader);
+keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+int keyLength = keySerializer.getLength();
+
+TypeSerializer typeSerializerA = config.getTypeSerializerIn(0, 

[GitHub] [flink] Aitozi commented on pull request #20246: [FLINK-28074][table-planner] show statistics details for DESCRIBE EXTENDED

2023-04-11 Thread via GitHub


Aitozi commented on PR #20246:
URL: https://github.com/apache/flink/pull/20246#issuecomment-1503403992

   I think it's a very useful auxiliary command, can we move this PR forward 
@swuferhong  @godfreyhe ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #22376: [FLINK-31767][table-planner] Improve the implementation for "analyze table" execution on partitioned table

2023-04-11 Thread via GitHub


Aitozi commented on code in PR #22376:
URL: https://github.com/apache/flink/pull/22376#discussion_r1162852786


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java:
##
@@ -137,13 +138,46 @@ private static CatalogColumnStatistics 
mergeColumnStatistics(
 if (!columns.isEmpty()) {
 columnStat = convertToColumnStatistics(row, columns);
 }
-return new Tuple2<>(tableStat, columnStat);
+return new StatisticsWrapper(tableStat, columnStat);
+}
+
+private static Map 
executeSqlAndGenerateStatisticsForMultiParts(
+TableEnvironmentImpl tableEnv,
+List columns,
+String statSql,
+int partitionCount) {
+TableResult tableResult = tableEnv.executeSql(statSql);

Review Comment:
   The empty partition will generate an empty sql here fails the test ci



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reswqa commented on pull request #17: [hotfix] Add opensearch.yml to enable flink to find the data

2023-04-11 Thread via GitHub


reswqa commented on PR #17:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/17#issuecomment-1503369425

   In fact, I prefer this approach, which is what I propose 
[here](https://issues.apache.org/jira/browse/FLINK-31704?focusedCommentId=17707954=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17707954).
 But let's wait for @dannycranmer's opinion again. If we can reach an 
agreement, I am willing to create a ticket to do this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


reta commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162830988


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   Moved it back to 1.17.0 ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


reta commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162829801


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   Hm ... there is no 1.17.1 yet ...,
   ```
   Run wget -q -c 
https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
 -O - | tar -xz
 wget -q -c 
https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
 -O - | tar -xz
 shell: /usr/bin/bash -e {0}
 env:
   MVN_COMMON_OPTIONS: -U -B --no-transfer-progress -Dflink.version=1.17.1
   MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
   FLINK_CACHE_DIR: /tmp/cache/flink
   MVN_BUILD_OUTPUT_FILE: /tmp/mvn_build_output.out
   MVN_VALIDATION_DIR: /tmp/flink-validation-deployment
   JAVA_HOME: /opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/8.0.362-9/x64
   JAVA_HOME_8_X64: 
/opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/8.0.362-9/x64
   MVN_DEPENDENCY_CONVERGENCE: -Dflink.convergence.phase=install 
-Pcheck-convergence
   binary_url: 
https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
   cache_binary: true
   
   gzip: stdin: unexpected end of file
   tar: Child returned status 1
   tar: Error is not recoverable: exiting now
   Error: Process completed with exit code 2.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG

2023-04-11 Thread via GitHub


dmvk commented on code in PR #22364:
URL: https://github.com/apache/flink/pull/22364#discussion_r1162826386


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -195,6 +195,34 @@ void 
testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws E
 
ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
 }
 
+@Test
+void testArchivedJobVerticesPresent() throws Exception {
+final JobGraph jobGraph = createJobGraph();
+jobGraph.setSnapshotSettings(
+new JobCheckpointingSettings(
+CheckpointCoordinatorConfiguration.builder().build(), 
null));
+
+final ArchivedExecutionGraph archivedExecutionGraph =
+new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+.build(EXECUTOR_RESOURCE.getExecutor())
+.getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
+
+ArchivedExecutionJobVertex jobVertex =
+archivedExecutionGraph.getJobVertex(JOB_VERTEX.getID());
+assertThat(jobVertex)
+.isNotNull()
+.satisfies(
+archived -> {
+assertThat(archived.getParallelism())
+.isEqualTo(JOB_VERTEX.getParallelism());
+// JOB_VERTEX.maxP == -1, but we want the actual 
maxP determined by the
+// scheduler
+
assertThat(archived.getMaxParallelism()).isEqualTo(128);
+});

Review Comment:
   nit:
   ```suggestion
   assertThat(jobVertex).isNotNull();
   
assertThat(jobVertex.getParallelism()).isEqualTo(JOB_VERTEX.getParallelism());
   // JOB_VERTEX.maxP == -1, but we want the actual maxP determined by 
the scheduler
   assertThat(jobVertex.getMaxParallelism()).isEqualTo(128);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31655) Adaptive Channel selection for partitioner

2023-04-11 Thread Anton Kalashnikov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710956#comment-17710956
 ] 

Anton Kalashnikov commented on FLINK-31655:
---

Hi [~tartarus] thanks for this discussion.

I think I remember the same discussion which [~gaoyunhaii] mentioned above. I 
just want to add that I tried to implement it as well. You can take a look at 
it here - https://github.com/apache/flink/pull/16224. My solution works without 
any extra synchronization and locks but the performance for other 
rebalances(non-Adaptive) was impacted anyway. I changed only 
`BufferWritingResultPartition#emitRecord` and added a non-volatile variable to 
`PipelinedSubpartition` which was enough for small degradation in the benchmark 
since it is a hot path. 
I don't remember exactly why we stopped discussing it but we should be careful 
about it since Adaptive Rebalance is not so common usage and it will be 
unfortunate if we slow down more common rebalances in favor to have this less 
common one.

> Adaptive Channel selection for partitioner
> --
>
> Key: FLINK-31655
> URL: https://issues.apache.org/jira/browse/FLINK-31655
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: tartarus
>Assignee: tartarus
>Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the 
> same, then by default the RebalancePartitioner will be used to select the 
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc 
> services, If some of the Operators are slow to return requests (for external 
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel 
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is 
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing 
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput 
> of job!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


reta commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162826956


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   Missed that, updated both to `1.17.1`. I think the snapshots (fe 
`1.18-SNAPSHOT`) are only for weekly builds (since this is very unstable 
target).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] MartijnVisser commented on pull request #17: [hotfix] Add opensearch.yml to enable flink to find the data

2023-04-11 Thread via GitHub


MartijnVisser commented on PR #17:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/17#issuecomment-1503352188

   > So my main question is, if we modify a major bug in the document, do we 
have to wait for the next connector release so that users can see the correct 
version doc.
   
   I don't think we should. I think we should update Flink to updates the 
references so they point to the branch, not the tag. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] MartijnVisser commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


MartijnVisser commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162816269


##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   I actually think you need to add `.github/workflows/push_pr.yml` to set the 
Flink version either to a matrix (with Flink 1.17.1 and potentially Flink 
1.18-SNAPSHOT), or at least that version should be updated to 1.17.1 :)



##
.github/workflows/release.yml:
##
@@ -24,7 +24,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16.1, 1.17.0]
+flink: [1.17.0]

Review Comment:
   I actually think you need to update `.github/workflows/push_pr.yml` to set 
the Flink version either to a matrix (with Flink 1.17.1 and potentially Flink 
1.18-SNAPSHOT), or at least that version should be updated to 1.17.1 :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reswqa commented on pull request #17: [hotfix] Add opensearch.yml to enable flink to find the data

2023-04-11 Thread via GitHub


reswqa commented on PR #17:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/17#issuecomment-1503347392

   I know --branch can point to both branch names and tags. If I understand 
correctly, we only push tags like v3.0.0 forward during each release. So my 
main question is, if we modify a major bug in the document, do we have to wait 
for the next connector release so that users can see the correct version doc. 
If we use v1.0.0 in setup-docs.sh, will there be this problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-11 Thread via GitHub


reta commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1162810174


##
.github/workflows/weekly.yml:
##
@@ -26,7 +26,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16-SNAPSHOT, 1.17-SNAPSHOT, 1.18-SNAPSHOT]

Review Comment:
   Thanks a lot @MartijnVisser , rebased, I think we are good to go



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-22727) SerializableAvroSchema cannot handle large schema

2023-04-11 Thread Serge Travin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Serge Travin resolved FLINK-22727.
--
Resolution: Duplicate

> SerializableAvroSchema cannot handle large schema
> -
>
> Key: FLINK-22727
> URL: https://issues.apache.org/jira/browse/FLINK-22727
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.0
>Reporter: Serge Travin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The issue is very similar to the 
> https://issues.apache.org/jira/browse/FLINK-19491 
> Here is the stack trace for the problem.
>  
>  
> {{Caused by: java.lang.RuntimeException: Could not serialize serializer into 
> the configuration.Caused by: java.lang.RuntimeException: Could not serialize 
> serializer into the configuration. at 
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.writeParametersToConfig(RuntimeSerializerFactory.java:61)
>  ~[flink-core-1.13.0.jar:1.13.0] at 
> org.apache.flink.runtime.operators.util.TaskConfig.setTypeSerializerFactory(TaskConfig.java:1211)
>  ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.runtime.operators.util.TaskConfig.setOutputSerializer(TaskConfig.java:594)
>  ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.connectJobVertices(JobGraphGenerator.java:1336)
>  ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.translateChannel(JobGraphGenerator.java:820)
>  ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:664)
>  ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] ... 15 moreCaused by: 
> java.io.UTFDataFormatException at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2164)
>  ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2007)
>  ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeUTF(ObjectOutputStream.java:869) 
> ~[?:1.8.0_202] at 
> org.apache.flink.formats.avro.typeutils.SerializableAvroSchema.writeObject(SerializableAvroSchema.java:55)
>  ~[flink-avro-1.13.0.jar:1.13.0] at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_202] at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_202] at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_202] at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_202] at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_202] at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
> ~[?:1.8.0_202] at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>  ~[flink-core-1.13.0.jar:1.13.0] at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:546)
>  ~[flink-core-1.13.0.jar:1.13.0] at 
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.writeParametersToConfig(RuntimeSerializerFactory.java:59)
>  ~[flink-core-1.13.0.jar:1.13.0] at 
> org.apache.flink.runtime.operators.util.TaskConfig.setTypeSerializerFactory(TaskConfig.java:1211)
>  ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.runtime.operators.util.TaskConfig.setOutputSerializer(TaskConfig.java:594)
>  ~[flink-runtime_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.connectJobVertices(JobGraphGenerator.java:1336)
>  ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.translateChannel(JobGraphGenerator.java:820)
>  ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:664)
>  ~[flink-optimizer_2.12-1.13.0.jar:1.13.0] ... 15 more}}
>  



--
This message was sent by Atlassian Jira

  1   2   >