[GitHub] [flink] FangYongs commented on pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

2023-04-05 Thread via GitHub


FangYongs commented on PR #22362:
URL: https://github.com/apache/flink/pull/22362#issuecomment-1498529894

   Hi @libenchao Please help to review this PR when you're free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #19873: [FLINK-27891][Table] Add ARRAY_APPEND and ARRAY_PREPEND functions

2023-04-05 Thread via GitHub


liuyongvs commented on code in PR #19873:
URL: https://github.com/apache/flink/pull/19873#discussion_r1159302878


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Type strategy that returns a {@link DataTypes#ARRAY(DataType)} with element 
type equal to the
+ * type of the first argument if it's not nullable or element to add is not 
nullable, otherwise it

Review Comment:
   please add a unit test for ArrayAppendPrependTypeStrategy like 
CurrentWatermarkTypeStrategyTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mas-chen commented on pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

2023-04-05 Thread via GitHub


mas-chen commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1498516945

   > > arithmetic overflow when the source is idle" or something similar?
   > 
   > Absolutely! 
"[[FLINK-31632](https://issues.apache.org/jira/browse/FLINK-31632)] Fix 
maxAllowedWatermark arithmetic overflow when the source is idle" sounds great! 
Do we edit the commit message on merging this pr or I re-push?
   
   Let's try to do that before merge. You can squash the commits into 1 and 
amend the commit message. It will make it easier for the committer to help 
merge =)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mas-chen commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

2023-04-05 Thread via GitHub


mas-chen commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159300038


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws 
Exception {
 assertThat(operator.emitNext(actualOutput), 
is(DataInputStatus.NOTHING_AVAILABLE));
 context.getTimeService().advance(1);
 assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+// receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   Oh, I see. Does it make sense to call operator.handleOperatorEvent(new 
WatermarkAlignmentEvent(...)) after every `assertLatestReportedWatermarkEvent` 
(line 162)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-24785) Relocate RocksDB's log under flink log directory by default

2023-04-05 Thread jinghaihang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709211#comment-17709211
 ] 

jinghaihang commented on FLINK-24785:
-

[~yunta] ,Thanks for your reply.

I agree with you and  please assign the task to me.

> Relocate RocksDB's log under flink log directory by default
> ---
>
> Key: FLINK-24785
> URL: https://issues.apache.org/jira/browse/FLINK-24785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Previously, RocksDB's log locates at its own DB folder, which makes the 
> debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
> directory by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31742) Replace deprecated TableSchema in flink-table-planner test

2023-04-05 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709210#comment-17709210
 ] 

Ran Tao commented on FLINK-31742:
-

[~icshuo] Yes. i changed the issue name. we can replace these old usages for 
next or 2.x released version. WDYT?

> Replace deprecated TableSchema in flink-table-planner test
> --
>
> Key: FLINK-31742
> URL: https://issues.apache.org/jira/browse/FLINK-31742
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>
> we can try to remove deprecated TableSchema and use Schema & ResolvedSchema 
> to replace it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31742) Replace deprecated TableSchema in flink-table-planner test

2023-04-05 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-31742:

Description: we can try to remove deprecated TableSchema and use Schema & 
ResolvedSchema to replace it.  (was: Try to remove deprecated TableSchema and 
use Schema & ResolvedSchema to replace it.)

> Replace deprecated TableSchema in flink-table-planner test
> --
>
> Key: FLINK-31742
> URL: https://issues.apache.org/jira/browse/FLINK-31742
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>
> we can try to remove deprecated TableSchema and use Schema & ResolvedSchema 
> to replace it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31742) Replace deprecated TableSchema in flink-table-planner test

2023-04-05 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-31742:

Summary: Replace deprecated TableSchema in flink-table-planner test  (was: 
Remove deprecated TableSchema in flink-table-planner test)

> Replace deprecated TableSchema in flink-table-planner test
> --
>
> Key: FLINK-31742
> URL: https://issues.apache.org/jira/browse/FLINK-31742
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>
> Try to remove deprecated TableSchema and use Schema & ResolvedSchema to 
> replace it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

2023-04-05 Thread via GitHub


flinkbot commented on PR #22362:
URL: https://github.com/apache/flink/pull/22362#issuecomment-1498505826

   
   ## CI report:
   
   * e37ef659f2937d871f9e26af1235edd142d736e8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31742) Remove deprecated TableSchema in flink-table-planner test

2023-04-05 Thread Shuo Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709203#comment-17709203
 ] 

Shuo Cheng commented on FLINK-31742:


Hi, [~lemonjing] , changes of public API generally should not be introduced in 
patch release.

> Remove deprecated TableSchema in flink-table-planner test
> -
>
> Key: FLINK-31742
> URL: https://issues.apache.org/jira/browse/FLINK-31742
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>
> Try to remove deprecated TableSchema and use Schema & ResolvedSchema to 
> replace it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31544) Introduce FlinkDatabaseMetaData for jdbc driver

2023-04-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31544:
---
Labels: pull-request-available  (was: )

> Introduce FlinkDatabaseMetaData for jdbc driver
> ---
>
> Key: FLINK-31544
> URL: https://issues.apache.org/jira/browse/FLINK-31544
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Introduce `FlinkDatabaseMetaData`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

2023-04-05 Thread via GitHub


FangYongs opened a new pull request, #22362:
URL: https://github.com/apache/flink/pull/22362

   ## What is the purpose of the change
   
   This PR aims to introduce database meta for jdbc driver
   
   ## Brief change log
   
 - Added `FlinkDatabaseMeta`
 - Added `CloseableResultIterator` for statement result and 
`getCatalogs`/`getSchemas` in database meta
 - Get catalogs and schemas in `FlinkDatabaseMeta`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added `FlinkSqlDriverTestBase` to build flink cluster and sql gateway
 - Added `FlinkDatabaseMetaDataTest` for `FlinkDatabaseMetaData`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (FLINK-28198) CassandraConnectorITCase fails with timeout

2023-04-05 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reopened FLINK-28198:
-

> CassandraConnectorITCase fails with timeout
> ---
>
> Key: FLINK-28198
> URL: https://issues.apache.org/jira/browse/FLINK-28198
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> Jun 22 07:57:37 [ERROR] 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts
>   Time elapsed: 12.067 s  <<< ERROR!
> Jun 22 07:57:37 
> com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1:59915] Timed out waiting for server response
> Jun 22 07:57:37   at 
> com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28198) CassandraConnectorITCase fails with timeout

2023-04-05 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709197#comment-17709197
 ] 

Sergey Nuyanzin commented on FLINK-28198:
-

still reproduced for 1.16
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47891=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=15459

> CassandraConnectorITCase fails with timeout
> ---
>
> Key: FLINK-28198
> URL: https://issues.apache.org/jira/browse/FLINK-28198
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> Jun 22 07:57:37 [ERROR] 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts
>   Time elapsed: 12.067 s  <<< ERROR!
> Jun 22 07:57:37 
> com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1:59915] Timed out waiting for server response
> Jun 22 07:57:37   at 
> com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58)
> Jun 22 07:57:37   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22361: [FLINK-31670][doc] Using v3.0.0-docs branch for es connector docs build.

2023-04-05 Thread via GitHub


flinkbot commented on PR #22361:
URL: https://github.com/apache/flink/pull/22361#issuecomment-1498462140

   
   ## CI report:
   
   * a1679e3923606de92788a5837e9142b27b1d4a47 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on a diff in pull request #22308: [FLINK-31518][Runtime / REST] Fix StandaloneHaServices#getClusterRestEndpointLeaderRetreiver to return correct rest port

2023-04-05 Thread via GitHub


huwh commented on code in PR #22308:
URL: https://github.com/apache/flink/pull/22308#discussion_r1159255563


##
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java:
##
@@ -176,6 +176,10 @@ public DispatcherResourceManagerComponent create(
 log.debug("Starting Dispatcher REST endpoint.");
 webMonitorEndpoint.start();
 
+configuration.setInteger(RestOptions.PORT, 
webMonitorEndpoint.getRestPort());
+configuration.setString(

Review Comment:
   Do we need update this Address?
   
   "getServerAddress()" may produce NPE here.



##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java:
##
@@ -128,11 +128,9 @@ public static HighAvailabilityServices 
createHighAvailabilityServices(
 
RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
 addressResolution,
 configuration);
-final String webMonitorAddress =
-getWebMonitorAddress(configuration, addressResolution);
 
 return new StandaloneHaServices(
-resourceManagerRpcUrl, dispatcherRpcUrl, 
webMonitorAddress);
+resourceManagerRpcUrl, dispatcherRpcUrl, 
configuration);

Review Comment:
   Do we really need these change if you use ClientHighAvailabilityServices to 
retrieve the rest address and port?
   

   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] haishui126 commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

2023-04-05 Thread via GitHub


haishui126 commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1498458381

   > @haishui126 can we add a more meaningful commit message such as 
"[FLINK-XYZ] Fix maxAllowedWatermark arithmetic overflow when the source is 
idle" or something similar?
   
   Absolutely! "[FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when 
the source is idle" sounds great! Do we edit the commit message on merging this 
pr or I re-push?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31670) ElasticSearch connector's document was not incorrect linked to external repo

2023-04-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31670:
---
Labels: pull-request-available  (was: )

> ElasticSearch connector's document was not incorrect linked to external repo
> 
>
> Key: FLINK-31670
> URL: https://issues.apache.org/jira/browse/FLINK-31670
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Documentation
>Affects Versions: elasticsearch-3.0.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>
> In the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/],
>  It still use "flink-version" for flink-connector-elastiacsearch instead of 
> the version in the external repository.
> 
> org.apache.flink
> flink-connector-elasticsearch6
> 1.18-SNAPSHOT
> 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa opened a new pull request, #22361: [FLINK-31670][doc] Using v3.0.0-docs branch for es connector docs build.

2023-04-05 Thread via GitHub


reswqa opened a new pull request, #22361:
URL: https://github.com/apache/flink/pull/22361

   ## What is the purpose of the change
   
   *In the 
[doc](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/),
 It still use "flink-version" for flink-connector-elastiacsearch instead of the 
version in the external repository.*
   
   
   ## Brief change log
   
 - *Using v3.0.0-docs branch for es connector docs build.*
   
   
   ## Verifying this change
   
   Manually test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-05 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709178#comment-17709178
 ] 

luoyuxia commented on FLINK-31689:
--

[~jirawech.s] Assign to you~ You can tried in your local and pr back. I can 
help view.

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-05 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-31689:


Assignee: jirawech.s  (was: luoyuxia)

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-05 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-31689:


Assignee: luoyuxia

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: luoyuxia
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31741) Supports data conversion according to type for executor

2023-04-05 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li reassigned FLINK-31741:
--

Assignee: Fang Yong

> Supports data conversion according to type for executor
> ---
>
> Key: FLINK-31741
> URL: https://issues.apache.org/jira/browse/FLINK-31741
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Currently the results in StatementResult are string, they should be convert 
> to different according to the data type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22360: [FLINK-31741][jdbc-driver] Support data converter for value in statement result

2023-04-05 Thread via GitHub


flinkbot commented on PR #22360:
URL: https://github.com/apache/flink/pull/22360#issuecomment-1498449680

   
   ## CI report:
   
   * dcbaa06d17c31c7d5f9eaff064ea3849fd1b1670 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on pull request #22360: [FLINK-31741][jdbc-driver] Support data converter for value in statement result

2023-04-05 Thread via GitHub


FangYongs commented on PR #22360:
URL: https://github.com/apache/flink/pull/22360#issuecomment-1498448558

   Hi @libenchao Please help to review this PR when you're free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31741) Supports data conversion according to type for executor

2023-04-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31741:
---
Labels: pull-request-available  (was: )

> Supports data conversion according to type for executor
> ---
>
> Key: FLINK-31741
> URL: https://issues.apache.org/jira/browse/FLINK-31741
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Currently the results in StatementResult are string, they should be convert 
> to different according to the data type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #22360: [FLINK-31741][jdbc-driver] Support data converter for value in statement result

2023-04-05 Thread via GitHub


FangYongs opened a new pull request, #22360:
URL: https://github.com/apache/flink/pull/22360

   ## What is the purpose of the change
   
   This PR aims to convert string value in `StatementResult` to different value 
for `ResultSet`
   
   
   ## Brief change log
   
 - Added `DataConverter` api
 - Added `DefaultDataConverter` and `StringDataConverter`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added test `FlinkResultSetTest.testStringResultSetPrimitiveData`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31742) Remove deprecated TableSchema in flink-table-planner test

2023-04-05 Thread Ran Tao (Jira)
Ran Tao created FLINK-31742:
---

 Summary: Remove deprecated TableSchema in flink-table-planner test
 Key: FLINK-31742
 URL: https://issues.apache.org/jira/browse/FLINK-31742
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: Ran Tao


Try to remove deprecated TableSchema and use Schema & ResolvedSchema to replace 
it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31732) flink-ml-uber module should include statefun as a dependency

2023-04-05 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31732:
--
Component/s: Library / Machine Learning

> flink-ml-uber module should include statefun as a dependency
> 
>
> Key: FLINK-31732
> URL: https://issues.apache.org/jira/browse/FLINK-31732
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31732) flink-ml-uber module should include statefun as a dependency

2023-04-05 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang closed FLINK-31732.
-
Resolution: Won't Fix

We leave state-fun as a third party dependency for now.

> flink-ml-uber module should include statefun as a dependency
> 
>
> Key: FLINK-31732
> URL: https://issues.apache.org/jira/browse/FLINK-31732
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhipeng Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31610) Refactoring of LocalBufferPool

2023-04-05 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709173#comment-17709173
 ] 

Rui Fan commented on FLINK-31610:
-

Thanks [~akalash]  driving this JIRA, and thanks [~Weijie Guo] [~pnowojski]  's 
discussion. Sorry for the late response.

 
{quote}Consider such a scenario, the {{{}CurrentPoolSize = 5{}}}, 
{{{}numOfRequestedMemorySegments = 7{}}}, {{{}maxOverdraftBuffersPerGate = 
2{}}}. If {{{}numberOfRequestedOverdraftMemorySegments = 0{}}}, then 2 buffers 
can be requested now. 

It only happens when poolSize changes.
{quote}
>From this case and the definition of the overdraft buffer, it shouldn't be 
>able to request now. General principle: new buffers can be requested when 
>"{_}numOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments < 
>poolSize + maxOverdraftBuffersPerGate"{_}.

For the code design, [~Weijie Guo]  and me discussed it in this PR[1] before, I 
didn't find this case at that time. If the case can happen, I think we should 
convert _{{numberOfRequestedMemorySegments}}_ to 
{{_numberOfRequestedOverdraftMemorySegments_ when poolSize is decreased.}}

 
{quote}I can propose getting rid of numberOfRequestedOverdraftMemorySegments 
and using existing numberOfRequestedMemorySegments instead.
{quote}
Overall, it think it is feasible. The new buffers can be requested when 
{_}"numberOfRequestedMemorySegments < poolSize + maxOverdraftBuffersPerGate"{_}:
 * When _numberOfRequestedMemorySegments <= poolSize,_ all buffers are ordinary 
buffer 
 * When _numberOfRequestedMemorySegments > poolSize,_ the `{_}ordinary buffer 
size = poolSize`{_}, and `{_}the overdraft buffer size = 
numberOfRequestedMemorySegments - poolSize`{_}

 

Please correct me if I'm wrong.

 

[1] https://github.com/apache/flink/pull/22084/files#r1128926904

> Refactoring of LocalBufferPool
> --
>
> Key: FLINK-31610
> URL: https://issues.apache.org/jira/browse/FLINK-31610
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.17.0
>Reporter: Anton Kalashnikov
>Priority: Major
>
> FLINK-31293 bug highlighted the issue with the internal mutual consistency of 
> different fields in LocalBufferPool. ex.:
> -  `numberOfRequestedOverdraftMemorySegments`
> -  `numberOfRequestedMemorySegments`
> -  `availableMemorySegment`
> -  `currentPoolSize`
> Most of the problem was fixed already(I hope) but it is a good idea to 
> reorganize the code in such a way that all invariants between all fields 
> inside will be clearly determined and difficult to break.
> As one example I can propose getting rid of 
> numberOfRequestedOverdraftMemorySegments and using existing 
> numberOfRequestedMemorySegments instead. That means:
> - the pool will be available when `!availableMemorySegments.isEmpty() && 
> unavailableSubpartitionsCount == 0`
> - we don't request a new `ordinary` buffer when 
> `numberOfRequestedMemorySegments >=  currentPoolSize` but we request the 
> overdraft buffer instead
> - `setNumBuffers` should work automatically without any changes
> I think we can come up with a couple of such improvements to simplify the 
> code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] huwh commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

2023-04-05 Thread via GitHub


huwh commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1159241282


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {
+private final CPUResource cpuResource;
+private final MemorySize memorySize;
+
+public RequiredRedundantResource(CPUResource cpuResource, MemorySize 
memorySize) {
+this.cpuResource = Preconditions.checkNotNull(cpuResource);
+this.memorySize = Preconditions.checkNotNull(memorySize);

Review Comment:
   Good suggestion.
   
   It makes more sense to consider subdivision resource from a semantic point 
of view.
   
   Then I don't need to introduce a new RequiredRedundantResource to manage the 
use of redundant resources. Instead, using ResourceProfile is sufficient
   
   Looking forward to your opinion as well @xintongsong 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

2023-04-05 Thread via GitHub


haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159239249


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws 
Exception {
 assertThat(operator.emitNext(actualOutput), 
is(DataInputStatus.NOTHING_AVAILABLE));
 context.getTimeService().advance(1);
 assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+// receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   Yes! I have read the code just now, I find the `currentMaxDesiredWatermark` 
is maxWatermark by default, that's why the old test passed. I'm confused 
whether it's right to write like this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31530) For currently commonly used databases, MySQL and Postgres have implemented catalogs. Currently, catalogs are implemented based on Oracle

2023-04-05 Thread xingyuan cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709170#comment-17709170
 ] 

xingyuan cheng commented on FLINK-31530:


[~jark] Hello, when we implement the CDAS whole database synchronization syntax 
based on Flink, we need to obtain the table information under a certain 
database based on the catalog of different databases, so we need the catalog 
implementation of different databases. The catalog of the existing database 
only supports Mysql and Postgresql. This change is to support Oracle Catalog

> For currently commonly used databases, MySQL and Postgres have implemented 
> catalogs. Currently, catalogs are implemented based on Oracle
> 
>
> Key: FLINK-31530
> URL: https://issues.apache.org/jira/browse/FLINK-31530
> Project: Flink
>  Issue Type: Improvement
>Reporter: xingyuan cheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27128) Broadcast stream Iteration closeWith failed starting version 1.12.2

2023-04-05 Thread LiJun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709167#comment-17709167
 ] 

LiJun commented on FLINK-27128:
---

it is apparently a bug from my point of view.  why  is this issue pending there 
for almost one year?

> Broadcast stream Iteration closeWith failed starting version 1.12.2
> ---
>
> Key: FLINK-27128
> URL: https://issues.apache.org/jira/browse/FLINK-27128
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.12.2, 1.14.4
>Reporter: Jeff Hu
>Priority: Major
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The following error on version later than 1.12.2:
> "Cannot close an iteration with a feedback DataStream that does not originate 
> from said iteration."
> There might be a bug with AbstractBroadcastStateTransformation.java.
> possible fix:
> predecessor.addAll(regularInput.getTransitivePredecessors());
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractBroadcastStateTransformation.java#L95



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-05 Thread via GitHub


xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1159213039


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {

Review Comment:
   Why do we need this interface? Or asked differently, why do we need 
`TierType` and `SupportedTierCombinations` implementing the same interface?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;

Review Comment:
   1. I'd suggest `o.a.f.r.i.n.partition.hybrid.tiered`.
   2. Why does this belong to `upstream`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.
+ */
+enum TierType implements TieredStoreMode {
+IN_CACHE,
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}
+
+/**
+ * Currently, only these tier combinations are supported. If the 
configured tiers is contained
+ * in the following combinations, an exception will be thrown.
+ */
+enum SupportedTierCombinations implements TieredStoreMode {
+MEMORY,
+MEMORY_DISK,
+MEMORY_DISK_REMOTE,
+}

Review Comment:
   Why are we limiting the supported combinations at all?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the

[GitHub] [flink] reswqa commented on a diff in pull request #22306: [FLINK-31447][runtime] Aligning unit tests of FineGrainedSlotManager with DeclarativeSlotManager

2023-04-05 Thread via GitHub


reswqa commented on code in PR #22306:
URL: https://github.com/apache/flink/pull/22306#discussion_r1159228040


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java:
##
@@ -114,6 +114,64 @@ void testAllocateSlot() throws Exception {
 assertThat(allocatedFuture).isNotCompletedExceptionally();
 }
 
+@Test
+void testAllocationUpdatesIgnoredIfSlotFreed() throws Exception {
+final FineGrainedTaskManagerTracker taskManagerTracker =
+new FineGrainedTaskManagerTracker();
+final CompletableFuture<
+Tuple6<
+SlotID,
+JobID,
+AllocationID,
+ResourceProfile,
+String,
+ResourceManagerId>>
+requestFuture = new CompletableFuture<>();
+final CompletableFuture responseFuture = new 
CompletableFuture<>();
+final TestingTaskExecutorGateway taskExecutorGateway =
+new TestingTaskExecutorGatewayBuilder()
+.setRequestSlotFunction(
+tuple6 -> {
+requestFuture.complete(tuple6);
+return responseFuture;
+})
+.createTestingTaskExecutorGateway();
+final TaskExecutorConnection taskExecutorConnection =
+new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
+taskManagerTracker.addTaskManager(
+taskExecutorConnection, ResourceProfile.ANY, 
ResourceProfile.ANY);
+final ResourceTracker resourceTracker = new DefaultResourceTracker();
+final JobID jobId = new JobID();
+final SlotStatusSyncer slotStatusSyncer =
+new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
+slotStatusSyncer.initialize(
+taskManagerTracker,
+resourceTracker,
+ResourceManagerId.generate(),
+EXECUTOR_RESOURCE.getExecutor());
+
+final CompletableFuture allocatedFuture =
+slotStatusSyncer.allocateSlot(
+taskExecutorConnection.getInstanceID(),
+jobId,
+"address",
+ResourceProfile.ANY);
+final AllocationID allocationId = requestFuture.get().f2;
+assertThat(resourceTracker.getAcquiredResources(jobId))
+.contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
+.hasValueSatisfying(slot -> 
assertThat(slot.getJobId()).isEqualTo(jobId));
+assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
+.hasValueSatisfying(
+slot -> 
assertThat(slot.getState()).isEqualTo(SlotState.PENDING));

Review Comment:
   These two assertions should be able to be combined into one 
`hasValueSatisfying `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on pull request #22306: [FLINK-31447][runtime] Aligning unit tests of FineGrainedSlotManager with DeclarativeSlotManager

2023-04-05 Thread via GitHub


huwh commented on PR #22306:
URL: https://github.com/apache/flink/pull/22306#issuecomment-1498409854

   Thanks @reswqa , I have updated this pr.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31741) Supports data conversion according to type for executor

2023-04-05 Thread Fang Yong (Jira)
Fang Yong created FLINK-31741:
-

 Summary: Supports data conversion according to type for executor
 Key: FLINK-31741
 URL: https://issues.apache.org/jira/browse/FLINK-31741
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / JDBC
Affects Versions: 1.18.0
Reporter: Fang Yong


Currently the results in StatementResult are string, they should be convert to 
different according to the data type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] mas-chen commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

2023-04-05 Thread via GitHub


mas-chen commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1498406007

   @haishui126 can we add a more meaningful commit message such as "[FLINK-XYZ] 
Fix maxAllowedWatermark arithmetic overflow when the source is idle" or 
something similar?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mas-chen commented on a diff in pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

2023-04-05 Thread via GitHub


mas-chen commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159221918


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws 
Exception {
 assertThat(operator.emitNext(actualOutput), 
is(DataInputStatus.NOTHING_AVAILABLE));
 context.getTimeService().advance(1);
 assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+// receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   I think you want to check the `WatermarkAlignmentEvent`? and then assert on 
whether the maxWatermark is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31729) Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN

2023-04-05 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709152#comment-17709152
 ] 

luoyuxia commented on FLINK-31729:
--

[~smiralex] Thanks for reporting. But I'm wondering why there's `-U`. Shoudn't 
it always be -D, +I, -D, +I? Is it a bug ?

> Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN
> -
>
> Key: FLINK-31729
> URL: https://issues.apache.org/jira/browse/FLINK-31729
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Alexander Smirnov
>Priority: Minor
> Fix For: 1.18.0
>
> Attachments: image-2023-04-05-00-08-32-984.png
>
>
> Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit 
> UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of 
> output records to INSERT/DELETE for simplicity. However, it doesn't work as 
> expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more 
> confusing - after UPDATE_BEFORE record there will be INSERT record (not 
> UPDATE_AFTER), which can cause bugs in case when downstream operators process 
> UPDATE records in a different way than INSERT/DELETE (for example, it can 
> assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some 
> point of time).
> How to reproduce:
> Suppose we have tables "source1" and "source2":
> CREATE TABLE source1(
>   id int PRIMARY KEY,
>   c3 bigint
> ) WITH (
>   'connector' = 'kafka',
>    ...
>   'format' = 'debezium-json'
> );
>  
> CREATE TABLE source2(
>   id int PRIMARY KEY,
>   c3 bigint
> ) WITH (
>   'connector' = 'kafka',
>    ...
>   'format' = 'debezium-json'
> );
> And we execute the following query:
> "select  t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on 
> t1.id = t2.id"
> Then we insert records one by one:
> source1: 
> {noformat}
> {"before":null,"after":{"id":2,"c3":7121},"op":"c"}{noformat}
> source2: 
> {noformat}
> {"before":null,"after":{"id":2,"c3":364},"op":"c"}{noformat}
> source1: 
> {noformat}
> {"before":{"id":2,"c3":7121},"after":{"id":2,"c3":7222},"op":"u"}{noformat}
> source2: 
> {noformat}
> {"before":{"id":2,"c3":364},"after":{"id":2,"c3":564},"op":"u"}{noformat}
> The result will be as in the following screenshot:
> !image-2023-04-05-00-08-32-984.png!
> Note, that after implementing ticket 
> https://issues.apache.org/jira/browse/FLINK-17337 (support emitting 
> UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described bug 
> won't be relevant anymore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zoltar9264 commented on pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-05 Thread via GitHub


zoltar9264 commented on PR #21822:
URL: https://github.com/apache/flink/pull/21822#issuecomment-1498381893

   Hi @curcur and @rkhachatryan , can you help us review this pr ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-05 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-31301:
---

Assignee: Aitozi

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Assignee: Aitozi
>Priority: Major
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-05 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709149#comment-17709149
 ] 

lincoln lee commented on FLINK-31301:
-

[~aitozi] welcome for contributing! assigned to you

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Assignee: Aitozi
>Priority: Major
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically

2023-04-05 Thread via GitHub


gaoyunhaii commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1159187859


##
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration test
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+private static final String DEFAULT_PATH_PREFIXES = 
"src/test/java,src/test/scala";
+
+private static final Option OPTION_HELP =
+Option.builder()
+.longOpt("help")
+.required(false)
+.hasArg(false)
+.desc("print this help information.")
+.build();
+
+private static final Option OPTION_DIR =
+Option.builder()
+.longOpt("dir")
+.required()
+.hasArg()
+.desc("The root directory for scanning. Required.")
+.build();
+
+private static final Option OPTION_VERSION =
+Option.builder()
+.longOpt("version")
+.required()
+.hasArg()
+.desc("The version to generate. Required.")
+.build();
+
+private static final Option OPTION_PREFIXES =
+Option.builder()
+.longOpt("prefixes")
+.required(false)
+.hasArg()
+.desc(
+"The prefix paths to scan under the root 
directory, separated by \",\". Default to \""
++ DEFAULT_PATH_PREFIXES
++ "\"")
+.build();
+
+private static final Option OPTION_CLASSES =
+Option.builder()
+.longOpt("classes")
+.required(false)
+.hasArg()
+.desc(
+"The specified qualified class name to generate 
test data, "
++ "separated by \",\". This option has a 
higher priority than the prefix option.")
+.build();
+
+private static final Pattern VERSION_PATTERN = 
Pattern.compile("v?([0-9]+)[._]([0-9]+)");
+
+private static final String CLASS_NAME_GROUP = "className";
+private static final Pattern CLASS_NAME_PATTERN =
+Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+public static void main(String[] args) {
+for (String s : args) {
+if (s.equals("--" + OPTION_HELP.getLongOpt())) {
+HelpFormatter helpFormatter = new HelpFormatter();
+helpFormatter.setOptionComparator(null);
+helpFormatter.printHelp(
+"java " + 
MigrationTestsSnapshotGenerator.class.getName(), createOptions());
+return;
+ 

[GitHub] [flink] mas-chen commented on a diff in pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

2023-04-05 Thread via GitHub


mas-chen commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159178247


##
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##
@@ -83,6 +83,22 @@ void testWatermarkAlignmentWithIdleness() throws Exception {
 reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
 assertLatestWatermarkAlignmentEvent(subtask0, 1042);
 assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+// all subtask becomes idle

Review Comment:
   Looks great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159168995


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+createTestTopic(topic, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+final String bootstraps = getBootstrapServers();
+
+// table with upsert-kafka connector, bounded mode up to offset=2
+final String createTableSql =
+String.format(
+"CREATE TABLE upsert_kafka (\n"
++ "  `user_id` BIGINT,\n"
++ "  `event_id` BIGINT,\n"
++ "  `payload` STRING,\n"
++ "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
++ ") WITH (\n"
++ "  'connector' = 'upsert-kafka',\n"
++ "  'topic' = '%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'key.format' = '%s',\n"
++ "  'value.format' = '%s',\n"
++ "  'value.fields-include' = 'ALL',\n"
++ "  'scan.bounded.mode' = 
'specific-offsets',\n"
++ "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
++ ")",
+topic, bootstraps, format, format);
+tEnv.executeSql(createTableSql);
+
+// insert multiple values to have more records past offset=2
+final String insertValuesSql =
+"INSERT INTO upsert_kafka\n"
++ "VALUES\n"
++ " (1, 100, 'payload 1'),\n"
++ " (2, 101, 'payload 2'),\n"
++ " (3, 102, 'payload 3'),\n"
++ " (1, 100, 'payload')";
+tEnv.executeSql(insertValuesSql).await();
+
+// results should only have records up to offset=2
+final List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+final List expected =
+Arrays.asList(
+changelogRow("+I", 1L, 100L, "payload 1"),
+changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   61e919ecca80bd094bcc431c6f298faa52c6f5c0
   
   this new commit answers my confusion :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159168995


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+createTestTopic(topic, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+final String bootstraps = getBootstrapServers();
+
+// table with upsert-kafka connector, bounded mode up to offset=2
+final String createTableSql =
+String.format(
+"CREATE TABLE upsert_kafka (\n"
++ "  `user_id` BIGINT,\n"
++ "  `event_id` BIGINT,\n"
++ "  `payload` STRING,\n"
++ "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
++ ") WITH (\n"
++ "  'connector' = 'upsert-kafka',\n"
++ "  'topic' = '%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'key.format' = '%s',\n"
++ "  'value.format' = '%s',\n"
++ "  'value.fields-include' = 'ALL',\n"
++ "  'scan.bounded.mode' = 
'specific-offsets',\n"
++ "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
++ ")",
+topic, bootstraps, format, format);
+tEnv.executeSql(createTableSql);
+
+// insert multiple values to have more records past offset=2
+final String insertValuesSql =
+"INSERT INTO upsert_kafka\n"
++ "VALUES\n"
++ " (1, 100, 'payload 1'),\n"
++ " (2, 101, 'payload 2'),\n"
++ " (3, 102, 'payload 3'),\n"
++ " (1, 100, 'payload')";
+tEnv.executeSql(insertValuesSql).await();
+
+// results should only have records up to offset=2
+final List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+final List expected =
+Arrays.asList(
+changelogRow("+I", 1L, 100L, "payload 1"),
+changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   
https://github.com/apache/flink-connector-kafka/pull/22/commits/61e919ecca80bd094bcc431c6f298faa52c6f5c0
   
   this new commit answers my confusion :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] mas-chen commented on a diff in pull request #16: [FLINK-30859] Externalize confluent avro related code

2023-04-05 Thread via GitHub


mas-chen commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-kafka/pull/16#discussion_r1159166591


##
flink-formats-confluent/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java:
##


Review Comment:
   I took all of the Flink-json and deleted the irrelevant code, keeping only 
debezium related code. I found that to be an easier approach when sync'ing the 
commits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159161297


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##
@@ -145,7 +145,7 @@ public static void validateSinkTopic(ReadableConfig 
tableOptions) {
 }
 }
 
-private static void validateScanStartupMode(ReadableConfig tableOptions) {

Review Comment:
   Changed this by mistake, will revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159159130


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+createTestTopic(topic, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+final String bootstraps = getBootstrapServers();
+
+// table with upsert-kafka connector, bounded mode up to offset=2
+final String createTableSql =
+String.format(
+"CREATE TABLE upsert_kafka (\n"
++ "  `user_id` BIGINT,\n"
++ "  `event_id` BIGINT,\n"
++ "  `payload` STRING,\n"
++ "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
++ ") WITH (\n"
++ "  'connector' = 'upsert-kafka',\n"
++ "  'topic' = '%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'key.format' = '%s',\n"
++ "  'value.format' = '%s',\n"
++ "  'value.fields-include' = 'ALL',\n"
++ "  'scan.bounded.mode' = 
'specific-offsets',\n"
++ "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
++ ")",
+topic, bootstraps, format, format);
+tEnv.executeSql(createTableSql);
+
+// insert multiple values to have more records past offset=2
+final String insertValuesSql =
+"INSERT INTO upsert_kafka\n"
++ "VALUES\n"
++ " (1, 100, 'payload 1'),\n"
++ " (2, 101, 'payload 2'),\n"
++ " (3, 102, 'payload 3'),\n"
++ " (1, 100, 'payload')";
+tEnv.executeSql(insertValuesSql).await();
+
+// results should only have records up to offset=2
+final List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+final List expected =
+Arrays.asList(
+changelogRow("+I", 1L, 100L, "payload 1"),
+changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Ah, yes you're right. Another one of those mind-twists I'm still trying to 
wrap my head around 
   So the case I mentioned would never happen then - if the end offset is right 
after a `+I` that happens to be an update, then the consumer side would 
normalize it as a `-U` and `+U`. I'll update the test to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159159130


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+createTestTopic(topic, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+final String bootstraps = getBootstrapServers();
+
+// table with upsert-kafka connector, bounded mode up to offset=2
+final String createTableSql =
+String.format(
+"CREATE TABLE upsert_kafka (\n"
++ "  `user_id` BIGINT,\n"
++ "  `event_id` BIGINT,\n"
++ "  `payload` STRING,\n"
++ "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
++ ") WITH (\n"
++ "  'connector' = 'upsert-kafka',\n"
++ "  'topic' = '%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'key.format' = '%s',\n"
++ "  'value.format' = '%s',\n"
++ "  'value.fields-include' = 'ALL',\n"
++ "  'scan.bounded.mode' = 
'specific-offsets',\n"
++ "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
++ ")",
+topic, bootstraps, format, format);
+tEnv.executeSql(createTableSql);
+
+// insert multiple values to have more records past offset=2
+final String insertValuesSql =
+"INSERT INTO upsert_kafka\n"
++ "VALUES\n"
++ " (1, 100, 'payload 1'),\n"
++ " (2, 101, 'payload 2'),\n"
++ " (3, 102, 'payload 3'),\n"
++ " (1, 100, 'payload')";
+tEnv.executeSql(insertValuesSql).await();
+
+// results should only have records up to offset=2
+final List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+final List expected =
+Arrays.asList(
+changelogRow("+I", 1L, 100L, "payload 1"),
+changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Ah, yes you're right. Another one of those mind-twists I'm still trying to 
wrap my head around 
   So the case I mentioned would never happen then - if the end offset is right 
after a `+I` that is an update, then the consumer side would normalize it as a 
`-U` and `+U`. I'll update the test to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] Gerrrr commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


Ge commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159155943


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+createTestTopic(topic, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+final String bootstraps = getBootstrapServers();
+
+// table with upsert-kafka connector, bounded mode up to offset=2
+final String createTableSql =
+String.format(
+"CREATE TABLE upsert_kafka (\n"
++ "  `user_id` BIGINT,\n"
++ "  `event_id` BIGINT,\n"
++ "  `payload` STRING,\n"
++ "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
++ ") WITH (\n"
++ "  'connector' = 'upsert-kafka',\n"
++ "  'topic' = '%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'key.format' = '%s',\n"
++ "  'value.format' = '%s',\n"
++ "  'value.fields-include' = 'ALL',\n"
++ "  'scan.bounded.mode' = 
'specific-offsets',\n"
++ "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
++ ")",
+topic, bootstraps, format, format);
+tEnv.executeSql(createTableSql);
+
+// insert multiple values to have more records past offset=2
+final String insertValuesSql =
+"INSERT INTO upsert_kafka\n"
++ "VALUES\n"
++ " (1, 100, 'payload 1'),\n"
++ " (2, 101, 'payload 2'),\n"
++ " (3, 102, 'payload 3'),\n"
++ " (1, 100, 'payload')";
+tEnv.executeSql(insertValuesSql).await();
+
+// results should only have records up to offset=2
+final List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+final List expected =
+Arrays.asList(
+changelogRow("+I", 1L, 100L, "payload 1"),
+changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Doesn't upsert kafka alway insert `I` and `D` and derive `-U` and `+U` on 
the consumer side via a normalization node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] Gerrrr commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


Ge commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159122981


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##
@@ -145,7 +145,7 @@ public static void validateSinkTopic(ReadableConfig 
tableOptions) {
 }
 }
 
-private static void validateScanStartupMode(ReadableConfig tableOptions) {

Review Comment:
   nit: we can keep this `private` as this PR does not add the scan startup 
mode to upsert kafka.



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##
@@ -189,7 +189,7 @@ private static void validateScanStartupMode(ReadableConfig 
tableOptions) {
 });
 }
 
-private static void validateScanBoundedMode(ReadableConfig tableOptions) {

Review Comment:
   nit: package-private is probably sufficient here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159113822


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+createTestTopic(topic, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+final String bootstraps = getBootstrapServers();
+
+// table with upsert-kafka connector, bounded mode up to offset=2
+final String createTableSql =
+String.format(
+"CREATE TABLE upsert_kafka (\n"
++ "  `user_id` BIGINT,\n"
++ "  `event_id` BIGINT,\n"
++ "  `payload` STRING,\n"
++ "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
++ ") WITH (\n"
++ "  'connector' = 'upsert-kafka',\n"
++ "  'topic' = '%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'key.format' = '%s',\n"
++ "  'value.format' = '%s',\n"
++ "  'value.fields-include' = 'ALL',\n"
++ "  'scan.bounded.mode' = 
'specific-offsets',\n"
++ "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
++ ")",
+topic, bootstraps, format, format);
+tEnv.executeSql(createTableSql);
+
+// insert multiple values to have more records past offset=2
+final String insertValuesSql =
+"INSERT INTO upsert_kafka\n"
++ "VALUES\n"
++ " (1, 100, 'payload 1'),\n"
++ " (2, 101, 'payload 2'),\n"
++ " (3, 102, 'payload 3'),\n"
++ " (1, 100, 'payload')";
+tEnv.executeSql(insertValuesSql).await();
+
+// results should only have records up to offset=2
+final List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+final List expected =
+Arrays.asList(
+changelogRow("+I", 1L, 100L, "payload 1"),
+changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Note: with boundedness set using specific offsets, would there be semantical 
issues if the end offset happens to be between a `-U` and `+U`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai commented on PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#issuecomment-1498264961

   cc @twalthr @dawidwys @Ge for review, thank you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31740) Allow setting boundedness for upsert-kafka SQL connector

2023-04-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31740:
---
Labels: pull-request-available  (was: )

> Allow setting boundedness for upsert-kafka SQL connector
> 
>
> Key: FLINK-31740
> URL: https://issues.apache.org/jira/browse/FLINK-31740
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
>
> With FLINK-24456, we added boundedness options for streaming mode to the SQL 
> Kafka Connector. This was mostly just an exposure of existing functionality 
> that was already available at the DataStream API level.
> We should do the same for the SQL Upsert Kafka Connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] tzulitai opened a new pull request, #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

2023-04-05 Thread via GitHub


tzulitai opened a new pull request, #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22

   This is a "follow-up" to previous work in 
https://github.com/apache/flink/pull/21808 that added boundedness options for 
the SQL Kafka Connector. This PR does the same for the SQL Upsert Kafka 
Connector.
   
   Please see new tests in `UpsertKafkaTableITCase` for end-to-end usage 
examples of the new boundedness options and the expected behaviour.
   
   Most of the added tests references what was added for the SQL Kafka 
Connector in https://github.com/apache/flink/pull/21808.
   
   ## Changelog
   
   1. Expose `scan.bounded.*` options for upsert Kafka
   2. Add unit tests for source configuration, as well as end-to-end 
integration tests with boundedness enabled
   3. Minor refactorings (tagged `[hotfix]` commits) to share test utilities 
across the tests for SQL Kafka Connector and SQL Upsert Kafka Connector.
   
   ## Testing
   
   Covered in new tests in `UpsertKafkaDynamicTableFactoryTest` and 
`UpsertKafkaTableITCase`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31740) Allow setting boundedness for upsert-kafka SQL connector

2023-04-05 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-31740:
---

 Summary: Allow setting boundedness for upsert-kafka SQL connector
 Key: FLINK-31740
 URL: https://issues.apache.org/jira/browse/FLINK-31740
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


With FLINK-24456, we added boundedness options for streaming mode to the SQL 
Kafka Connector. This was mostly just an exposure of existing functionality 
that was already available at the DataStream API level.

We should do the same for the SQL Upsert Kafka Connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] nateab closed pull request #22313: [FLINK-31660][connector-kafka] fix kafka connector pom so ITCases run in IDE

2023-04-05 Thread via GitHub


nateab closed pull request #22313: [FLINK-31660][connector-kafka] fix kafka 
connector pom so ITCases run in IDE
URL: https://github.com/apache/flink/pull/22313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] nateab commented on pull request #22313: [FLINK-31660][connector-kafka] fix kafka connector pom so ITCases run in IDE

2023-04-05 Thread via GitHub


nateab commented on PR #22313:
URL: https://github.com/apache/flink/pull/22313#issuecomment-1498155014

   @zentol Thanks for the suggestion, i was able to get it working now after 
building it on the command line.
   
   @twalthr Yes I believe you are correct. 
   
   Here is a new PR that has a cleaner fix targeting the 1.17 branch 
https://github.com/apache/flink/pull/22359 so I will close this one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] nateab commented on a diff in pull request #22313: [FLINK-31660][connector-kafka] fix kafka connector pom so ITCases run in IDE

2023-04-05 Thread via GitHub


nateab commented on code in PR #22313:
URL: https://github.com/apache/flink/pull/22313#discussion_r1159018047


##
flink-connectors/flink-connector-kafka/pom.xml:
##
@@ -225,6 +241,23 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-enforcer-plugin
+   
+   
+   
forbid-direct-table-planner-dependencies
+   
+   enforce
+   
+   
+   

Review Comment:
   for this and the below pom changes, we based it off a similar module 
`flink-connector-hive` that had working `TableITCase`s. However, Chesnay 
suggested a cleaner solution, so i will follow through with that instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22359: [FLINK-31660][table] add jayway json path dependency to table-planner

2023-04-05 Thread via GitHub


flinkbot commented on PR #22359:
URL: https://github.com/apache/flink/pull/22359#issuecomment-1498148518

   
   ## CI report:
   
   * c96df5042093ce7ee8ab284888d627a554c5746e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] nateab opened a new pull request, #22359: [FLINK-31660][table] add jayway json path dependency to table-planner

2023-04-05 Thread via GitHub


nateab opened a new pull request, #22359:
URL: https://github.com/apache/flink/pull/22359

   ## What is the purpose of the change
   
   This PR adds the `json-path` dependency to the table-planner pom so that 
`ITCases` in the kafka connector module can run in the Intellij IDE. This is 
due to an old Intellij bug that does not handle the maven-shade plugin 
correctly https://youtrack.jetbrains.com/issue/IDEA-93855.
   
   ## Brief change log
   
 - Add the `json-path` to the `flink-table-planner_${scala.binary.version}` 
pom

   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`KafkaChangelogTableITCase`.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #16: [FLINK-30859] Externalize confluent avro related code

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-kafka/pull/16#discussion_r1158843434


##
flink-formats-confluent/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java:
##


Review Comment:
    makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] mas-chen commented on a diff in pull request #16: [FLINK-30859] Externalize confluent avro related code

2023-04-05 Thread via GitHub


mas-chen commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-kafka/pull/16#discussion_r1158835180


##
flink-formats-confluent/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java:
##


Review Comment:
   Just the debezium parts which will require a dependency on Flink-json I 
believe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #16: [FLINK-30859] Externalize confluent avro related code

2023-04-05 Thread via GitHub


tzulitai commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-kafka/pull/16#discussion_r1158798008


##
flink-formats-confluent/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java:
##


Review Comment:
   Do you intend to port all of `flink-json` over or just the Debezium parts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on pull request #22350: [FLINK-31471] Allow setting JobResourceRequirements through WEB UI.

2023-04-05 Thread via GitHub


dmvk commented on PR #22350:
URL: https://github.com/apache/flink/pull/22350#issuecomment-1497788210

   For the UX comments: I'm not a UX person, and this is good enough for the 
demo purpose (it was not meant for serious usage, that's what REST API is for). 
We touched on this a few weeks back with @knaufk, and the plan is to reach out 
to the ML with the demo and see if any UX-proficient person volunteers to 
improve this.
   
   WDYT? I can, of course, try to pull something off, but there is certainly a 
limit on how good I can make this.
   
   > errors about exceeding max parallelism are not reported to the user or 
handled by the UI
   
    I'll look into that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] reswqa closed pull request #56: [Do Not Merge] Test V3.0 CI

2023-04-05 Thread via GitHub


reswqa closed pull request #56: [Do Not Merge] Test V3.0 CI
URL: https://github.com/apache/flink-connector-elasticsearch/pull/56


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31739) ElasticSearch and Cassandra connector v3.0 branch's CI is not working properly

2023-04-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31739:
---
Labels: pull-request-available  (was: )

> ElasticSearch and Cassandra connector v3.0 branch's CI is not working properly
> --
>
> Key: FLINK-31739
> URL: https://issues.apache.org/jira/browse/FLINK-31739
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra, Connectors / ElasticSearch
>Affects Versions: elasticsearch-3.0.1, cassandra-3.0.1
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.0.1, cassandra-3.0.1
>
>
> After FLINK-30963, we no longer manually set {{flink_url}}, but it is 
> required in some connector's own {{ci.yml}}, which causes CI to fail to run 
> like 
> [this|https://github.com/apache/flink-connector-elasticsearch/actions/runs/4620241065]).
>  The root of this problem is that these branch does not use the {{ci.yml}} in 
> {{flink-connector-shared-utils}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-cassandra] boring-cyborg[bot] commented on pull request #6: [FLINK-31739][ci] Reuse ci.yml in flink-connector-shared-utils.

2023-04-05 Thread via GitHub


boring-cyborg[bot] commented on PR #6:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/6#issuecomment-1497771418

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] reswqa opened a new pull request, #6: [FLINK-31739][ci] Reuse ci.yml in flink-connector-shared-utils.

2023-04-05 Thread via GitHub


reswqa opened a new pull request, #6:
URL: https://github.com/apache/flink-connector-cassandra/pull/6

   Use the common `ci.yml` in `flink-connector-shared-utils` for v3.0 branch's 
workflows instead of standalone yml.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31518) HighAvailabilityServiceUtils resolves wrong webMonitorAddress in Standalone mode

2023-04-05 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708995#comment-17708995
 ] 

Samrat Deb commented on FLINK-31518:


[~huwh] please review the pr in free time 

> HighAvailabilityServiceUtils resolves wrong webMonitorAddress in Standalone 
> mode
> 
>
> Key: FLINK-31518
> URL: https://issues.apache.org/jira/browse/FLINK-31518
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / REST
>Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.17.1
>Reporter: Vineeth Naroju
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> {{HighAvailabilityServiceUtils#createHighAvailabilityServices()}}  in 
> {{HighAvailabilityMode.NONE}} mode uses 
> {{HighAvailabilityServiceUtils#getWebMonitorAddress()}} to get web monitor 
> address.
> {{HighAvailabilityServiceUtils#getWebMonitorAddress()}} reads only from 
> {{rest.port}} in {{flink-conf.yaml}} . If {{rest.port}} is not enabled, then 
> it returns {{0}} port number. It should dynamically fetch port number if 
> {{rest.port}} is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31214) Add support for new command line option -py.pythonpath

2023-04-05 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb resolved FLINK-31214.

Resolution: Fixed

master :- f8b0834fb83b7d17ac9eeaf09f2f6cea070c978a

> Add support for new command line option -py.pythonpath
> --
>
> Key: FLINK-31214
> URL: https://issues.apache.org/jira/browse/FLINK-31214
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31739) ElasticSearch and Cassandra connector v3.0 branch's CI is not working properly

2023-04-05 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-31739:
--

 Summary: ElasticSearch and Cassandra connector v3.0 branch's CI is 
not working properly
 Key: FLINK-31739
 URL: https://issues.apache.org/jira/browse/FLINK-31739
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Cassandra, Connectors / ElasticSearch
Affects Versions: elasticsearch-3.0.1, cassandra-3.0.1
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: elasticsearch-3.0.1, cassandra-3.0.1


After FLINK-30963, we no longer manually set {{flink_url}}, but it is required 
in some connector's own {{ci.yml}}, which causes CI to fail to run like 
[this|https://github.com/apache/flink-connector-elasticsearch/actions/runs/4620241065]).
 The root of this problem is that these branch does not use the {{ci.yml}} in 
{{flink-connector-shared-utils}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] Samrat002 commented on pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-04-05 Thread via GitHub


Samrat002 commented on PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#issuecomment-1497724491

   
   > 2. Can we create e2e tests using localstack or something similar? I have 
not looked to see if anything exist
   
   created separate jira to add e2e 
https://issues.apache.org/jira/browse/FLINK-30742 . adding e2e in same pr is 
getting large . 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-training] JosemyAB closed pull request #57: Implementation of oficial exercises

2023-04-05 Thread via GitHub


JosemyAB closed pull request #57: Implementation of oficial exercises
URL: https://github.com/apache/flink-training/pull/57


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mohsenrezaeithe commented on pull request #21849: [FLINK-30596][Runtime/REST] Fix duplicate jobs when submitting with the same jobId

2023-04-05 Thread via GitHub


mohsenrezaeithe commented on PR #21849:
URL: https://github.com/apache/flink/pull/21849#issuecomment-1497710031

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash closed pull request #19018: [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMul

2023-04-05 Thread via GitHub


akalash closed pull request #19018: [FLINK-25819][runtime] Reordered requesting 
and recycling buffers in order to avoid race condition in 
testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
URL: https://github.com/apache/flink/pull/19018


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash closed pull request #17825: Test case for remapping keygroups

2023-04-05 Thread via GitHub


akalash closed pull request #17825: Test case for remapping keygroups
URL: https://github.com/apache/flink/pull/17825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash closed pull request #16471: [FLINK-22893][tests] Wait for all task running before externalized ch…

2023-04-05 Thread via GitHub


akalash closed pull request #16471: [FLINK-22893][tests] Wait for all task 
running before externalized ch…
URL: https://github.com/apache/flink/pull/16471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash closed pull request #16224: Added LoadRebalancePartitioner in order to select the freest subparti…

2023-04-05 Thread via GitHub


akalash closed pull request #16224: Added LoadRebalancePartitioner in order to 
select the freest subparti…
URL: https://github.com/apache/flink/pull/16224


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-05 Thread via GitHub


reta commented on code in PR #16:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/16#discussion_r1158673505


##
.github/workflows/weekly.yml:
##
@@ -26,7 +26,7 @@ jobs:
 if: github.repository_owner == 'apache'
 strategy:
   matrix:
-flink: [1.16-SNAPSHOT, 1.17-SNAPSHOT, 1.18-SNAPSHOT]

Review Comment:
   @MartijnVisser @snuyanzin I was not able to make the nightly e2e tests to 
run across all three versions (the changes in `ExecutionConfig` are resulting 
in NPEs all over the place for `1.16-SNAPSHOT`), I had to drop it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta opened a new pull request, #16: [hotfix] Fix nightly build for 1.18-SNAPSHOT, drop 1.16-SNAPSHOT from the nightly matrix

2023-04-05 Thread via GitHub


reta opened a new pull request, #16:
URL: https://github.com/apache/flink-connector-opensearch/pull/16

   Fixing the compilation issue for 1.18 (see please 
https://github.com/apache/flink-connector-opensearch/actions/runs/4615625503/jobs/8159702829).
   
   ```
   Error:  
/home/runner/work/flink-connector-opensearch/flink-connector-opensearch/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/OpensearchUtil.java:[144,19]
 org.apache.flink.connector.opensearch.OpensearchUtil.MockContext is not 
abstract and does not override abstract method getTargetColumns() in 
org.apache.flink.table.connector.sink.DynamicTableSink.Context
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31301) Unsupported nested columns in column list of insert statement

2023-04-05 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708948#comment-17708948
 ] 

Aitozi commented on FLINK-31301:


Hello [~lincoln.86xy], I want to contribute to this ticket and I have basically 
finished this. Can you help assign this ticket to me ?

> Unsupported nested columns in column list of insert statement
> -
>
> Key: FLINK-31301
> URL: https://issues.apache.org/jira/browse/FLINK-31301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Priority: Major
>
> Currently an error will be raised when use nested columns in column list of 
> insert statement, e.g.,
> {code:java}
> INSERT INTO nested_type_sink (a,b.b1,c.c2,f)
> SELECT a,b.b1,c.c2,f FROM nested_type_src
> {code}
>  
> {code:java}
> java.lang.AssertionError
>     at org.apache.calcite.sql.SqlIdentifier.getSimple(SqlIdentifier.java:333)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTargetField(SqlValidatorUtil.java:612)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:171)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:63)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] reswqa commented on pull request #42: [hotfix] Improve CI trigger and cancel strategy

2023-04-05 Thread via GitHub


reswqa commented on PR #42:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/42#issuecomment-1497627697

   Thanks @tisonkun, the explanation of the `Concurrency strategy` in this PR 
is very useful  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on pull request #22350: [FLINK-31471] Allow setting JobResourceRequirements through WEB UI.

2023-04-05 Thread via GitHub


dmvk commented on PR #22350:
URL: https://github.com/apache/flink/pull/22350#issuecomment-1497623195

   Thanks for the review Chesnay, I've addressed the comments and scheduled 
some follow-ups. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22358: [FLINK-31738] Prevent name clash in generated clients

2023-04-05 Thread via GitHub


flinkbot commented on PR #22358:
URL: https://github.com/apache/flink/pull/22358#issuecomment-1497621576

   
   ## CI report:
   
   * 8db2f4bab4db12d8c90208cbb4b09b8ff3d61a30 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #22356: WARNING: An illegal reflective access operation has occurred

2023-04-05 Thread via GitHub


zentol commented on PR #22356:
URL: https://github.com/apache/flink/pull/22356#issuecomment-1497617569

   See https://flink.apache.org/getting-help/.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol closed pull request #22356: WARNING: An illegal reflective access operation has occurred

2023-04-05 Thread via GitHub


zentol closed pull request #22356: WARNING: An illegal reflective access 
operation has occurred
URL: https://github.com/apache/flink/pull/22356


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31738) FlameGraphTypeQueryParameter#Type clashes with java.reflect.Type in generated clients

2023-04-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31738:
---
Labels: pull-request-available  (was: )

> FlameGraphTypeQueryParameter#Type clashes with java.reflect.Type in generated 
> clients
> -
>
> Key: FLINK-31738
> URL: https://issues.apache.org/jira/browse/FLINK-31738
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / REST
>Affects Versions: 1.17.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.1
>
>
> Generating a client with the openapi generators causes compile errors because 
> the generated file imports java.reflect.Type, but also the generated "Type" 
> model.
> For convenience it would be neat to give this enum a slightly different name, 
> because working around this issue is surprisingly annoying.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol opened a new pull request, #22358: [FLINK-31738] Prevent name clash in generated clients

2023-04-05 Thread via GitHub


zentol opened a new pull request, #22358:
URL: https://github.com/apache/flink/pull/22358

   Generating a client with the openapi generators causes compile errors 
because the generated file imports java.reflect.Type, but also the generated 
"Type" model.
   
   For convenience it would be neat to give this enum a slightly different 
name, because working around this issue is surprisingly annoying.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31470) Introduce integration tests for Externalized Declarative Resource Management

2023-04-05 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-31470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708937#comment-17708937
 ] 

David Morávek commented on FLINK-31470:
---

master: d4e3b6646f389eeb395a4dbb951d13bab02cb8db

> Introduce integration tests for Externalized Declarative Resource Management
> 
>
> Key: FLINK-31470
> URL: https://issues.apache.org/jira/browse/FLINK-31470
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31470) Introduce integration tests for Externalized Declarative Resource Management

2023-04-05 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek resolved FLINK-31470.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

> Introduce integration tests for Externalized Declarative Resource Management
> 
>
> Key: FLINK-31470
> URL: https://issues.apache.org/jira/browse/FLINK-31470
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31738) FlameGraphTypeQueryParameter#Type clashes with java.reflect.Type in generated clients

2023-04-05 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-31738:


 Summary: FlameGraphTypeQueryParameter#Type clashes with 
java.reflect.Type in generated clients
 Key: FLINK-31738
 URL: https://issues.apache.org/jira/browse/FLINK-31738
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Runtime / REST
Affects Versions: 1.17.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0, 1.17.1


Generating a client with the openapi generators causes compile errors because 
the generated file imports java.reflect.Type, but also the generated "Type" 
model.

For convenience it would be neat to give this enum a slightly different name, 
because working around this issue is surprisingly annoying.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dmvk merged pull request #22343: [FLINK-31470] Add integration tests for Externalized Declarative Reso…

2023-04-05 Thread via GitHub


dmvk merged PR #22343:
URL: https://github.com/apache/flink/pull/22343


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-05 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-30989:
---
Fix Version/s: 1.16.2

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * 

[jira] [Updated] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-05 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-30989:
---
Fix Version/s: 1.17.1

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.1
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * 

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-05 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708930#comment-17708930
 ] 

Weijie Guo commented on FLINK-30989:


Summary:

Fixed streaming module:
master(1.18) via ccd0fe2d75a26a158ad64ab25bb5063a7031d428.
release-1.17 via 40e9501a5fcd7a71af4a7e79cd1556e190488137
release-1.16 via 75fab2759a1a2a2664d6b9f4a006a56f1a65d2fe.

Fixed Table module:
master(1.18) via b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4.
release-1.17 via 333088113993f4607038dae391863b5c30d0bc95.
release-1.16 waiting for the confirmation.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> 

  1   2   3   >