[flink] branch master updated: [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished

2023-06-19 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8119411addd [FLINK-32370][streaming] Fix warn log in result fetcher 
when job is finished
8119411addd is described below

commit 8119411addd9c82c15bab8480e7b35b8e6394d43
Author: Shammon FY 
AuthorDate: Mon Jun 19 10:17:19 2023 +0800

[FLINK-32370][streaming] Fix warn log in result fetcher when job is finished

Close apache/flink#22819
---
 .../client/program/rest/RestClusterClientTest.java | 37 ++
 .../operators/collect/CollectResultFetcher.java|  7 ++--
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ff6dea2a98..740eb06a57b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -122,6 +123,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -1136,6 +1138,32 @@ class RestClusterClientTest {
 }
 }
 
+@Test
+void testSendCoordinationRequestException() throws Exception {
+final TestClientCoordinationHandler handler =
+new TestClientCoordinationHandler(new 
FlinkJobNotFoundException(jobId));
+try (TestRestServerEndpoint restServerEndpoint = 
createRestServerEndpoint(handler)) {
+try (RestClusterClient restClusterClient =
+
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+String payload = "testing payload";
+TestCoordinationRequest request = new 
TestCoordinationRequest<>(payload);
+
+assertThatThrownBy(
+() ->
+restClusterClient
+.sendCoordinationRequest(
+jobId, new 
OperatorID(), request)
+.get())
+.matches(
+e ->
+
ExceptionUtils.findThrowableWithMessage(
+e,
+
FlinkJobNotFoundException.class.getName())
+.isPresent());
+}
+}
+}
+
 /**
  * The SUSPENDED job status should never be returned by the client thus 
client retries until it
  * either receives a different job status or the cluster is not reachable.
@@ -1166,9 +1194,15 @@ class RestClusterClientTest {
 ClientCoordinationRequestBody,
 ClientCoordinationResponseBody,
 ClientCoordinationMessageParameters> {
+@Nullable private final FlinkJobNotFoundException exception;
 
 private TestClientCoordinationHandler() {
+this(null);
+}
+
+private TestClientCoordinationHandler(@Nullable 
FlinkJobNotFoundException exception) {
 super(ClientCoordinationHeaders.getInstance());
+this.exception = exception;
 }
 
 @Override
@@ -1178,6 +1212,9 @@ class RestClusterClientTest {
 @Nonnull DispatcherGateway gateway)
 throws RestHandlerException {
 try {
+if (exception != null) {
+throw exception;
+}
 TestCoordinationRequest req =
 (TestCoordinationRequest)
 request.getRequestBody()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
index 519b7d603f1..a7916502214 100644
--- 

[flink] branch master updated (4586bcbe662 -> b462d0ec4d1)

2023-06-19 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 4586bcbe662 [FLINK-27240][table] Support ADD PARTITION statement for 
partitioned table (#22711)
 new 7e42493b021 [hotfix] Enrich hybridPartitionDataConsumeConstraint when 
create AdaptiveBatchScheduler in scheduler benchmark utils.
 new b462d0ec4d1 [FLINK-32288][runtime] Improve the scheduling performance 
of AdaptiveBatchScheduler

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../AllFinishedInputConsumableDecider.java |  5 +++--
 .../strategy/DefaultInputConsumableDecider.java| 13 +
 .../scheduler/strategy/InputConsumableDecider.java | 10 +-
 .../PartialFinishedInputConsumableDecider.java |  5 +++--
 .../strategy/VertexwiseSchedulingStrategy.java |  5 +
 .../benchmark/SchedulerBenchmarkUtils.java |  2 ++
 .../DefaultInputConsumableDeciderTest.java | 22 ++
 .../strategy/TestingInputConsumableDecider.java|  6 ++
 8 files changed, 63 insertions(+), 5 deletions(-)



[flink] 02/02: [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler

2023-06-19 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b462d0ec4d1d421a369e45f8dca33284b5be6bc2
Author: sunxia 
AuthorDate: Fri Jun 16 10:14:21 2023 +0800

[FLINK-32288][runtime] Improve the scheduling performance of 
AdaptiveBatchScheduler

This close #22798.
---
 .../AllFinishedInputConsumableDecider.java |  5 +++--
 .../strategy/DefaultInputConsumableDecider.java| 13 +
 .../scheduler/strategy/InputConsumableDecider.java | 10 +-
 .../PartialFinishedInputConsumableDecider.java |  5 +++--
 .../strategy/VertexwiseSchedulingStrategy.java |  5 +
 .../DefaultInputConsumableDeciderTest.java | 22 ++
 .../strategy/TestingInputConsumableDecider.java|  6 ++
 7 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
index 6c23757a1a7..f8cbb260488 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
@@ -37,14 +37,15 @@ public class AllFinishedInputConsumableDecider implements 
InputConsumableDecider
 executionVertex.getConsumedPartitionGroups()) {
 
 if (!consumableStatusCache.computeIfAbsent(
-consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+consumedPartitionGroup, 
this::isConsumableBasedOnFinishedProducers)) {
 return false;
 }
 }
 return true;
 }
 
-private boolean isConsumedPartitionGroupConsumable(
+@Override
+public boolean isConsumableBasedOnFinishedProducers(
 final ConsumedPartitionGroup consumedPartitionGroup) {
 return consumedPartitionGroup.getNumberOfUnfinishedPartitions() == 0;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
index 93db09c94b6..ccd354b0d0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
@@ -65,6 +65,19 @@ public class DefaultInputConsumableDecider implements 
InputConsumableDecider {
 return true;
 }
 
+@Override
+public boolean isConsumableBasedOnFinishedProducers(
+final ConsumedPartitionGroup consumedPartitionGroup) {
+if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
+// For canBePipelined consumed partition group, whether it is 
consumable does not depend
+// on task finish. To optimize performance and avoid unnecessary 
computation, we simply
+// return false.
+return false;
+} else {
+return consumedPartitionGroup.areAllPartitionsFinished();
+}
+}
+
 private boolean isConsumedPartitionGroupConsumable(
 final ConsumedPartitionGroup consumedPartitionGroup,
 final Set verticesToSchedule) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
index e34cb06bc4e..1d19dd2cf62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
@@ -24,7 +24,7 @@ import java.util.function.Function;
 
 /**
  * {@link InputConsumableDecider} is responsible for determining whether the 
input of an
- * executionVertex is consumable.
+ * executionVertex or a consumed partition group is consumable.
  */
 public interface InputConsumableDecider {
 /**
@@ -41,6 +41,14 @@ public interface InputConsumableDecider {
 Set verticesToSchedule,
 Map consumableStatusCache);
 
+/**
+ * Determining whether the consumed partition group is consumable based on 
finished producers.
+ *
+ * @param consumedPartitionGroup to be determined whether it is consumable.
+ */
+boolean isConsumableBasedOnFinishedProducers(
+final ConsumedPartitionGroup consumedPartitionGroup);
+
 /** Factory for {@link InputConsumableDecider}. */
 interface Factory {
 InputConsumableDecider createInstance(
diff 

[flink] 01/02: [hotfix] Enrich hybridPartitionDataConsumeConstraint when create AdaptiveBatchScheduler in scheduler benchmark utils.

2023-06-19 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e42493b021e3e012dd429e89588a54d8b151dfd
Author: sunxia 
AuthorDate: Thu Jun 15 18:12:45 2023 +0800

[hotfix] Enrich hybridPartitionDataConsumeConstraint when create 
AdaptiveBatchScheduler in scheduler benchmark utils.
---
 .../flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index 2368e781ec4..a60b3fd584b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -135,6 +135,8 @@ public class SchedulerBenchmarkUtils {
 return schedulerBuilder
 .setVertexParallelismAndInputInfosDecider(
 
createCustomParallelismDecider(jobConfiguration.getParallelism()))
+.setHybridPartitionDataConsumeConstraint(
+
jobConfiguration.getHybridPartitionDataConsumeConstraint())
 .setInputConsumableDeciderFactory(
 loadInputConsumableDeciderFactory(
 
jobConfiguration.getHybridPartitionDataConsumeConstraint()))



[flink] branch release-1.17 updated: [BP-1.17][FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) (#22825)

2023-06-19 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 3b5c1f7915e [BP-1.17][FLINK-32374][table-planner] Fix the issue that 
ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
overwriting (#22820) (#22825)
3b5c1f7915e is described below

commit 3b5c1f7915ecfe0f7e353fd342f8d22df5bcd7c4
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Tue Jun 20 09:48:08 2023 +0800

[BP-1.17][FLINK-32374][table-planner] Fix the issue that 
ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
overwriting (#22820) (#22825)
---
 .../table/planner/plan/ExecNodeGraphInternalPlan.java  |  1 +
 .../org/apache/flink/table/api/CompiledPlanITCase.java |  9 -
 .../apache/flink/table/planner/utils/TableTestBase.scala   | 14 ++
 3 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
index 576426a63f3..71c9b13b04e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
@@ -79,6 +79,7 @@ public class ExecNodeGraphInternalPlan implements 
InternalPlan {
 file.toPath(),
 serializedPlan.getBytes(StandardCharsets.UTF_8),
 StandardOpenOption.CREATE,
+StandardOpenOption.TRUNCATE_EXISTING,
 StandardOpenOption.WRITE);
 } catch (IOException e) {
 throw new TableException("Cannot write the compiled plan to file 
'" + file + "'.", e);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 887d8a8d791..b5485097aa6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -258,7 +259,8 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
 TableResult tableResult =
 tableEnv.executeSql(
 String.format(
-"COMPILE PLAN '%s' FOR INSERT INTO sink SELECT 
* FROM src",
+"COMPILE PLAN '%s' FOR INSERT INTO sink "
++ "SELECT IF(a > b, a, b) AS a, b + 1 
AS b, SUBSTR(c, 1, 4) AS c FROM src WHERE a > 10",
 planPath));
 
 assertThat(tableResult).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
@@ -271,6 +273,11 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
 "COMPILE PLAN '%s' FOR INSERT INTO 
sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src",
 planPath)))
 .isEqualTo(TableResultInternal.TABLE_RESULT_OK);
+assertThat(
+TableTestUtil.isValidJson(
+FileUtils.readFileToString(
+planPath.toFile(), 
StandardCharsets.UTF_8)))
+.isTrue();
 
 tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", 
planPath)).await();
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 0615b66b671..ef570f18156 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, 
RowTypeInfo, TupleType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.BatchExecutionOptions
 import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException
 import 

[flink] branch release-1.16 updated (d4bdca1f76e -> 08bced4646c)

2023-06-19 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from d4bdca1f76e [FLINK-32368][test] Fixes wrong interface implementation 
in KubernetesTestFixture
 add 08bced4646c [BP-1.16][FLINK-32374][table-planner] Fix the issue that 
ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
overwriting (#22820) (#22826)

No new revisions were added by this update.

Summary of changes:
 .../table/planner/plan/ExecNodeGraphInternalPlan.java  |  1 +
 .../org/apache/flink/table/api/CompiledPlanITCase.java |  9 -
 .../apache/flink/table/planner/utils/TableTestBase.scala   | 14 ++
 3 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated: [FLINK-27240][table] Support ADD PARTITION statement for partitioned table (#22711)

2023-06-19 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4586bcbe662 [FLINK-27240][table] Support ADD PARTITION statement for 
partitioned table (#22711)
4586bcbe662 is described below

commit 4586bcbe6627ad91001838507b9e1e68ddfc7cdf
Author: yuxia Luo 
AuthorDate: Tue Jun 20 09:02:28 2023 +0800

[FLINK-27240][table] Support ADD PARTITION statement for partitioned table 
(#22711)

Co-authored-by: fengli 
---
 .../src/main/codegen/data/Parser.tdd   |  3 ++
 .../src/main/codegen/includes/parserImpls.ftl  | 42 
 .../flink/sql/parser/ddl/SqlAddPartitions.java |  7 +++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 19 
 .../operations/SqlNodeToOperationConversion.java   | 15 --
 .../SqlAlterTableAddPartitionConverter.java| 56 ++
 .../operations/converters/SqlNodeConverters.java   |  1 +
 .../operations/SqlDdlToOperationConverterTest.java | 28 +++
 .../flink/table/api/TableEnvironmentTest.scala | 51 
 9 files changed, 207 insertions(+), 15 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 27289112a3e..3fb6fe854f3 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,6 +31,8 @@
 "org.apache.flink.sql.parser.ddl.resource.SqlResource"
 "org.apache.flink.sql.parser.ddl.resource.SqlResourceType"
 "org.apache.flink.sql.parser.ddl.SqlAddJar"
+"org.apache.flink.sql.parser.ddl.SqlAddPartitions"
+
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
 "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
 "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
 "org.apache.flink.sql.parser.ddl.SqlAlterTable"
@@ -122,6 +124,7 @@
 "org.apache.calcite.sql.SqlCreate"
 "org.apache.calcite.sql.SqlDrop"
 "java.util.ArrayList"
+"java.util.Collections"
 "java.util.HashSet"
 "java.util.List"
 "java.util.Set"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 60cb2e038c9..25af4e20f7e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -591,6 +591,7 @@ SqlAlterTable SqlAlterTable() :
 SqlIdentifier tableIdentifier;
 SqlIdentifier newTableIdentifier = null;
 boolean ifExists = false;
+boolean ifNotExists = false;
 SqlNodeList propertyList = SqlNodeList.EMPTY;
 SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
 SqlNodeList partitionSpec = null;
@@ -599,6 +600,7 @@ SqlAlterTable SqlAlterTable() :
 SqlIdentifier originColumnIdentifier;
 SqlIdentifier newColumnIdentifier;
 AlterTableContext ctx = new AlterTableContext();
+AlterTableAddPartitionContext addPartitionCtx = new 
AlterTableAddPartitionContext();
 }
 {
   { startPos = getPos(); }
@@ -650,6 +652,16 @@ SqlAlterTable SqlAlterTable() :
 }
 |
 
+(
+AlterTableAddPartition(addPartitionCtx)
+{
+return new SqlAddPartitions(startPos.plus(getPos()),
+tableIdentifier,
+addPartitionCtx.ifNotExists,
+addPartitionCtx.partSpecs,
+addPartitionCtx.partProps);
+}
+|
 (
 AlterTableAddOrModify(ctx)
 |
@@ -669,6 +681,7 @@ SqlAlterTable SqlAlterTable() :
 ctx.watermark,
 ifExists);
 }
+)
 |
 
 (
@@ -923,6 +936,35 @@ SqlTableColumn RegularColumn(TableCreationContext context, 
SqlIdentifier name, S
 }
 }
 
+/** Parses {@code ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION 
partition_spec [PARTITION partition_spec][...]}. */
+void  AlterTableAddPartition(AlterTableAddPartitionContext context) :
+{
+List partSpecs = new ArrayList();
+List partProps = new ArrayList();
+SqlNodeList partSpec;
+SqlNodeList partProp;
+}
+{
+context.ifNotExists = IfNotExistsOpt()
+(
+
+{
+partSpec = new SqlNodeList(getPos());
+partProp = null;
+PartitionSpecCommaList(partSpec);
+}
+[  { partProp = TableProperties(); } ]
+{
+partSpecs.add(partSpec);
+partProps.add(partProp);
+}
+)+
+{
+context.partSpecs = partSpecs;
+context.partProps = partProps;
+}
+}
+
 /** Parses {@code ALTER TABLE table_name ADD/MODIFY 

[flink-connector-elasticsearch] branch v3.0 updated: [hotfix] Allow unmatched tests (#70)

2023-06-19 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch v3.0
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/v3.0 by this push:
 new 36409d2  [hotfix] Allow unmatched tests (#70)
36409d2 is described below

commit 36409d2683de67558e5977b29dc21bd359a5afec
Author: Sergey Nuyanzin 
AuthorDate: Mon Jun 19 13:36:25 2023 +0200

[hotfix] Allow unmatched tests (#70)
---
 .../src/test/resources/archunit.properties  | 2 ++
 flink-connector-elasticsearch6/src/test/resources/archunit.properties   | 2 ++
 flink-connector-elasticsearch7/src/test/resources/archunit.properties   | 2 ++
 3 files changed, 6 insertions(+)

diff --git 
a/flink-connector-elasticsearch-base/src/test/resources/archunit.properties 
b/flink-connector-elasticsearch-base/src/test/resources/archunit.properties
index 15be88c..48011f9 100644
--- a/flink-connector-elasticsearch-base/src/test/resources/archunit.properties
+++ b/flink-connector-elasticsearch-base/src/test/resources/archunit.properties
@@ -29,3 +29,5 @@ freeze.store.default.allowStoreUpdate=true
 #freeze.refreeze=true
 
 freeze.store.default.path=archunit-violations
+
+archRule.failOnEmptyShould = false
diff --git 
a/flink-connector-elasticsearch6/src/test/resources/archunit.properties 
b/flink-connector-elasticsearch6/src/test/resources/archunit.properties
index 15be88c..48011f9 100644
--- a/flink-connector-elasticsearch6/src/test/resources/archunit.properties
+++ b/flink-connector-elasticsearch6/src/test/resources/archunit.properties
@@ -29,3 +29,5 @@ freeze.store.default.allowStoreUpdate=true
 #freeze.refreeze=true
 
 freeze.store.default.path=archunit-violations
+
+archRule.failOnEmptyShould = false
diff --git 
a/flink-connector-elasticsearch7/src/test/resources/archunit.properties 
b/flink-connector-elasticsearch7/src/test/resources/archunit.properties
index 15be88c..48011f9 100644
--- a/flink-connector-elasticsearch7/src/test/resources/archunit.properties
+++ b/flink-connector-elasticsearch7/src/test/resources/archunit.properties
@@ -29,3 +29,5 @@ freeze.store.default.allowStoreUpdate=true
 #freeze.refreeze=true
 
 freeze.store.default.path=archunit-violations
+
+archRule.failOnEmptyShould = false



[flink-kubernetes-operator] branch main updated: [FLINK-32334] Also check if no taskmanager are running while waiting for cluster shutdown

2023-06-19 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 4d9615f5 [FLINK-32334] Also check if no taskmanager are running while 
waiting for cluster shutdown
4d9615f5 is described below

commit 4d9615f5c76672c9b324ed8f4876d62af7fef60e
Author: Nicolas Fraison <6404013+ashan...@users.noreply.github.com>
AuthorDate: Mon Jun 19 11:57:53 2023 +0200

[FLINK-32334] Also check if no taskmanager are running while waiting for 
cluster shutdown
---
 .../operator/service/AbstractFlinkService.java | 12 -
 .../operator/service/NativeFlinkService.java   |  6 +
 .../operator/service/StandaloneFlinkService.java   |  9 +++
 .../kubernetes/operator/TestingFlinkService.java   |  5 
 .../service/StandaloneFlinkServiceTest.java| 31 --
 5 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 90866abd..82aa0cc0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -157,6 +157,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 
 protected abstract PodList getJmPodList(String namespace, String 
clusterId);
 
+protected abstract PodList getTmPodList(String namespace, String 
clusterId);
+
 protected abstract void deployApplicationCluster(JobSpec jobSpec, 
Configuration conf)
 throws Exception;
 
@@ -819,6 +821,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 LOG.info("Waiting for cluster shutdown...");
 
 boolean jobManagerRunning = true;
+boolean taskManagerRunning = true;
 boolean serviceRunning = true;
 
 for (int i = 0; i < shutdownTimeout; i++) {
@@ -829,6 +832,13 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 jobManagerRunning = false;
 }
 }
+if (taskManagerRunning) {
+PodList tmPodList = getTmPodList(namespace, clusterId);
+
+if (tmPodList.getItems().isEmpty()) {
+taskManagerRunning = false;
+}
+}
 
 if (serviceRunning) {
 Service service =
@@ -843,7 +853,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 }
 }
 
-if (!jobManagerRunning && !serviceRunning) {
+if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) {
 break;
 }
 // log a message waiting to shutdown Flink cluster every 5 seconds.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 912a19e1..b1191e21 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -93,6 +93,12 @@ public class NativeFlinkService extends AbstractFlinkService 
{
 .list();
 }
 
+@Override
+protected PodList getTmPodList(String namespace, String clusterId) {
+// Native mode does not manage TaskManager
+return new PodList();
+}
+
 protected void submitClusterInternal(Configuration conf) throws Exception {
 LOG.info("Deploying session cluster");
 final ClusterClientServiceLoader clusterClientServiceLoader =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index bdc6b65f..77aa08b8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -90,6 +90,15 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
 .list();
 }
 
+@Override
+protected PodList getTmPodList(String namespace, String clusterId) {
+return kubernetesClient
+.pods()
+.inNamespace(namespace)
+

[flink] branch master updated (1a3151b5b9f -> de0efcbe533)

2023-06-19 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 1a3151b5b9f [FLINK-32233][sql] Introduce SupportsTruncate interface 
(#22692)
 add de0efcbe533 [FLINK-31782][runtime] Makes DefaultLeaderElectionService 
implement MultipleComponentLeaderElectionDriver.Listener (#22640)

No new revisions were added by this update.

Summary of changes:
 .../DefaultLeaderElectionService.java  | 30 +-
 1 file changed, 29 insertions(+), 1 deletion(-)



[flink] branch master updated: [FLINK-32233][sql] Introduce SupportsTruncate interface (#22692)

2023-06-19 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1a3151b5b9f [FLINK-32233][sql] Introduce SupportsTruncate interface 
(#22692)
1a3151b5b9f is described below

commit 1a3151b5b9f1df8aaa221b2febfae0431a69b95d
Author: yuxia Luo 
AuthorDate: Mon Jun 19 17:28:52 2023 +0800

[FLINK-32233][sql] Introduce SupportsTruncate interface (#22692)
---
 .../connector/sink/abilities/SupportsTruncate.java | 43 ++
 1 file changed, 43 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTruncate.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTruncate.java
new file mode 100644
index 000..6b5a43d06df
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTruncate.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables to delete all existing data in a {@link DynamicTableSink} table 
using {@code TRUNCATE
+ * TABLE} statement.
+ *
+ * For {@code TRUNCATE TABLE} statement, if the corresponding {@link 
DynamicTableSink} have
+ * implemented this interface, then the method {@link #executeTruncation()} 
will be invoked in
+ * execution phase. Otherwise, Flink will throw exception directly.
+ */
+@PublicEvolving
+public interface SupportsTruncate {
+
+/**
+ * Execute truncating table.
+ *
+ * Note: please remember to throw exception if the truncation hasn't 
been executed
+ * successfully, otherwise it'll be still considered to haven been 
executed successfully by
+ * Flink.
+ */
+void executeTruncation();
+}



[flink] branch master updated: [FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820)

2023-06-19 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f69ed3454f2 [FLINK-32374][table-planner] Fix the issue that 
ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
overwriting (#22820)
f69ed3454f2 is described below

commit f69ed3454f2ab200310edee230da292ee2408503
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Mon Jun 19 17:22:21 2023 +0800

[FLINK-32374][table-planner] Fix the issue that 
ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
overwriting (#22820)
---
 .../table/planner/plan/ExecNodeGraphInternalPlan.java  |  1 +
 .../org/apache/flink/table/api/CompiledPlanITCase.java |  9 -
 .../apache/flink/table/planner/utils/TableTestBase.scala   | 14 ++
 3 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
index 576426a63f3..71c9b13b04e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
@@ -79,6 +79,7 @@ public class ExecNodeGraphInternalPlan implements 
InternalPlan {
 file.toPath(),
 serializedPlan.getBytes(StandardCharsets.UTF_8),
 StandardOpenOption.CREATE,
+StandardOpenOption.TRUNCATE_EXISTING,
 StandardOpenOption.WRITE);
 } catch (IOException e) {
 throw new TableException("Cannot write the compiled plan to file 
'" + file + "'.", e);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 887d8a8d791..b5485097aa6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -258,7 +259,8 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
 TableResult tableResult =
 tableEnv.executeSql(
 String.format(
-"COMPILE PLAN '%s' FOR INSERT INTO sink SELECT 
* FROM src",
+"COMPILE PLAN '%s' FOR INSERT INTO sink "
++ "SELECT IF(a > b, a, b) AS a, b + 1 
AS b, SUBSTR(c, 1, 4) AS c FROM src WHERE a > 10",
 planPath));
 
 assertThat(tableResult).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
@@ -271,6 +273,11 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
 "COMPILE PLAN '%s' FOR INSERT INTO 
sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src",
 planPath)))
 .isEqualTo(TableResultInternal.TABLE_RESULT_OK);
+assertThat(
+TableTestUtil.isValidJson(
+FileUtils.readFileToString(
+planPath.toFile(), 
StandardCharsets.UTF_8)))
+.isTrue();
 
 tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", 
planPath)).await();
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 06f782d66c3..1086e65609c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, 
RowTypeInfo, TupleType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.BatchExecutionOptions
 import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
 import 

[flink] branch master updated: [FLINK-32271][build] Bump snappy-java to 1.1.10.1

2023-06-19 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ba47237a0a4 [FLINK-32271][build] Bump snappy-java to 1.1.10.1
ba47237a0a4 is described below

commit ba47237a0a44222a275ba7a1d144822466d20f15
Author: Ryan Skraba 
AuthorDate: Fri Jun 16 20:47:24 2023 +0200

[FLINK-32271][build] Bump snappy-java to 1.1.10.1
---
 flink-dist/src/main/resources/META-INF/NOTICE   | 2 +-
 .../flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE   | 2 +-
 flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE | 2 +-
 flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE | 2 +-
 .../src/main/resources/META-INF/NOTICE  | 2 +-
 pom.xml | 2 +-
 6 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-dist/src/main/resources/META-INF/NOTICE 
b/flink-dist/src/main/resources/META-INF/NOTICE
index 7cbacd9e17c..5d9a217e153 100644
--- a/flink-dist/src/main/resources/META-INF/NOTICE
+++ b/flink-dist/src/main/resources/META-INF/NOTICE
@@ -19,7 +19,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.javassist:javassist:3.24.0-GA
 - org.lz4:lz4-java:1.8.0
 - org.objenesis:objenesis:2.1
-- org.xerial.snappy:snappy-java:1.1.8.3
+- org.xerial.snappy:snappy-java:1.1.10.1
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details.
diff --git 
a/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE 
b/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE
index fa5e0b26f85..a7b54bcf948 100644
--- 
a/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE
+++ 
b/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE
@@ -31,7 +31,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.kerby:kerby-asn1:1.0.1
 - org.apache.kerby:kerby-pkix:1.0.1
 - org.apache.kerby:kerby-util:1.0.1
-- org.xerial.snappy:snappy-java:1.1.8.3
+- org.xerial.snappy:snappy-java:1.1.10.1
 
 This project bundles the following dependencies under the MIT 
(https://opensource.org/licenses/MIT)
 
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE
index 8f0f179f326..fa67a18717e 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE
@@ -41,7 +41,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.kerby:kerby-pkix:1.0.1
 - org.apache.kerby:kerby-util:1.0.1
 - org.wildfly.openssl:wildfly-openssl:1.0.7.Final
-- org.xerial.snappy:snappy-java:1.1.8.3
+- org.xerial.snappy:snappy-java:1.1.10.1
 - software.amazon.ion:ion-java:1.0.2
 
 This project bundles the following dependencies under BSD-2 License 
(https://opensource.org/licenses/BSD-2-Clause).
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE 
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE
index dc42b334706..ab442ff671f 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE
@@ -55,7 +55,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.kerby:kerby-util:1.0.1
 - org.weakref:jmxutils:1.19
 - org.wildfly.openssl:wildfly-openssl:1.0.7.Final
-- org.xerial.snappy:snappy-java:1.1.8.3
+- org.xerial.snappy:snappy-java:1.1.10.1
 - software.amazon.ion:ion-java:1.0.2
 
 This project bundles the following dependencies under BSD-2 License 
(https://opensource.org/licenses/BSD-2-Clause).
diff --git 
a/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE
 
b/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE
index 6f6c6786a8d..fb62caf7ea9 100644
--- 
a/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE
+++ 
b/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE
@@ -17,7 +17,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.commons:commons-compress:1.21
 - org.apache.kafka:kafka-clients:7.2.2-ccs
 - org.glassfish.jersey.core:jersey-common:2.30
-- org.xerial.snappy:snappy-java:1.1.8.3
+- org.xerial.snappy:snappy-java:1.1.10.1
 
 The binary distribution of this product bundles these dependencies under the 
Eclipse Public License - v 2.0 
(https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.txt)
 
diff 

[flink] branch master updated: [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717)

2023-06-19 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6e22b94e708 [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717)
6e22b94e708 is described below

commit 6e22b94e708052ac450f84bce954a4d3ec7bb772
Author: Hanyu Zheng <135176127+hanyuzhe...@users.noreply.github.com>
AuthorDate: Mon Jun 19 01:09:20 2023 -0700

[FLINK-31665] [table] Add ARRAY_CONCAT function (#22717)
---
 docs/data/sql_functions.yml|  3 +
 docs/data/sql_functions_zh.yml |  3 +
 .../docs/reference/pyflink.table/expressions.rst   |  1 +
 flink-python/pyflink/table/expression.py   |  9 +++
 .../flink/table/api/internal/BaseExpressions.java  | 35 
 .../functions/BuiltInFunctionDefinitions.java  | 10 +++
 .../table/types/inference/InputTypeStrategies.java |  8 ++
 .../functions/CollectionFunctionsITCase.java   | 94 +-
 .../functions/scalar/ArrayConcatFunction.java  | 79 ++
 9 files changed, 241 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 12b032351f9..87c8b0f9486 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -646,6 +646,9 @@ collection:
   - sql: ARRAY_UNION(array1, array2)
 table: haystack.arrayUnion(array)
 description: Returns an array of the elements in the union of array1 and 
array2, without duplicates. If any of the array is null, the function will 
return null.
+  - sql: ARRAY_CONCAT(array1, ...)
+table: array1.arrayConcat(...)
+description: Returns an array that is the result of concatenating at least 
one array. This array contains all the elements in the first array, followed by 
all the elements in the second array, and so forth, up to the Nth array. If any 
input array is NULL, the function returns NULL.
   - sql: MAP_KEYS(map)
 table: MAP.mapKeys()
 description: Returns the keys of the map as array. No order guaranteed.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index efb16a2c633..f5249408a6a 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -724,6 +724,9 @@ collection:
   - sql: CARDINALITY(map)
 table: MAP.cardinality()
 description: 返回 map 中的 entries 数量。
+  - sql: ARRAY_CONCAT(array1, ...)
+table: array1.arrayConcat(...)
+description: 
返回一个数组,该数组是连接至少一个数组的结果。该数组包含第一个数组中的所有元素,然后是第二个数组中的所有元素,依此类推,直到第 N 个数组。如果任何输入数组为 
NULL,则函数返回 NULL。
   - sql: map ‘[’ value ‘]’
 table: MAP.at(ANY)
 description: 返回 map 中指定 key 对应的值。
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 86e93fefccf..0f9ac9bdc1d 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -225,6 +225,7 @@ advanced type helper functions
 Expression.at
 Expression.cardinality
 Expression.element
+Expression.array_concat
 Expression.array_contains
 Expression.array_distinct
 Expression.array_position
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 52967d542c5..78984fd5d32 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1519,6 +1519,15 @@ class Expression(Generic[T]):
 """
 return _binary_op("arrayUnion")(self, array)
 
+def array_concat(self, *arrays) -> 'Expression':
+"""
+Returns an array that is the result of concatenating at least one 
array.
+This array contains all the elements in the first array, followed by 
all
+the elements in the second array, and so forth, up to the Nth array.
+If any input array is NULL, the function returns NULL.
+"""
+return _binary_op("arrayConcat")(self, *arrays)
+
 @property
 def map_keys(self) -> 'Expression':
 """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index b308c049b19..5caefd52b2d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -53,6 +53,7 @@ import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;

[flink-connector-elasticsearch] branch v3.0 updated: [FLINK-32357] Elasticsearch v3.0 won't compile when testing against Flink 1.17.1

2023-06-19 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch v3.0
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/v3.0 by this push:
 new 9876d39  [FLINK-32357] Elasticsearch v3.0 won't compile when testing 
against Flink 1.17.1
9876d39 is described below

commit 9876d39269b36b2a47f819541fb10e774e573e09
Author: Andriy Redko 
AuthorDate: Mon Jun 19 03:44:03 2023 -0400

[FLINK-32357] Elasticsearch v3.0 won't compile when testing against Flink 
1.17.1

Signed-off-by: Andriy Redko 
---
 pom.xml | 18 +-
 1 file changed, 1 insertion(+), 17 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5b088d1..c6ecfa7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,12 +57,11 @@ under the License.
4.13.2
5.8.1
3.21.0
-   0.22.0
1.17.2
2.21.0
 
false
-   1.15.0
+   
3.0.0-1.16.0
 
1.7.36
2.17.2
@@ -337,21 +336,6 @@ under the License.
pom
import

-
-   
-   com.tngtech.archunit
-   archunit
-   ${archunit.version}
-   test
-   
-
-   
-   com.tngtech.archunit
-   archunit-junit5
-   ${archunit.version}
-   test
-   
-


 



[flink] branch master updated (fa94fb5a027 -> 1ca19805ad3)

2023-06-19 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from fa94fb5a027 [FLINK-31721][core] Move JobStatusHook to flink-core module
 add 1ca19805ad3 [hotfix][state/changelog] Fix the description of 
ChangelogCompatibilityITCase

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/test/state/ChangelogCompatibilityITCase.java | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)



[flink] branch release-1.16 updated (f9394025fb7 -> d4bdca1f76e)

2023-06-19 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from f9394025fb7 [FLINK-32136][python] Fix the issue that Pyflink gateway 
server launch fails when purelib != platlib
 add d4bdca1f76e [FLINK-32368][test] Fixes wrong interface implementation 
in KubernetesTestFixture

No new revisions were added by this update.

Summary of changes:
 .../KubernetesCheckpointIDCounterTest.java |  4 ++--
 .../KubernetesLeaderElectionDriverTest.java| 27 +++---
 .../KubernetesStateHandleStoreTest.java|  6 ++---
 .../highavailability/KubernetesTestFixture.java|  5 ++--
 4 files changed, 21 insertions(+), 21 deletions(-)



[flink] branch release-1.17 updated: [FLINK-32368][test] Fixes wrong interface implementation in KubernetesTestFixture

2023-06-19 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new c359fe884b6 [FLINK-32368][test] Fixes wrong interface implementation 
in KubernetesTestFixture
c359fe884b6 is described below

commit c359fe884b6e23f47f7904c8be0d3081b05ebd48
Author: Matthias Pohl 
AuthorDate: Fri Jun 16 14:43:15 2023 +0200

[FLINK-32368][test] Fixes wrong interface implementation in 
KubernetesTestFixture

Signed-off-by: Matthias Pohl 
---
 .../KubernetesCheckpointIDCounterTest.java |  4 ++--
 .../KubernetesLeaderElectionDriverTest.java| 27 +++---
 .../KubernetesStateHandleStoreTest.java|  6 ++---
 .../highavailability/KubernetesTestFixture.java|  5 ++--
 4 files changed, 21 insertions(+), 21 deletions(-)

diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
index 9e206353e17..953cd7522dc 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -159,7 +159,7 @@ class KubernetesCheckpointIDCounterTest extends 
KubernetesHighAvailabilityTestBa
 checkpointIDCounter
 
.shutdown(JobStatus.FINISHED)
 .get())
-
.satisfies(anyCauseMatches(CompletionException.class));
+
.satisfies(anyCauseMatches(ExecutionException.class));
 
 // fixing the internal issue should make the 
shutdown succeed again
 KubernetesUtils.createConfigMapIfItDoesNotExist(
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
index a7654938cd0..af33b03779e 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import org.apache.flink.runtime.leaderelection.LeaderElectionException;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 
 import org.junit.jupiter.api.Test;
@@ -28,7 +30,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 
-import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
 import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
@@ -91,9 +92,9 @@ class KubernetesLeaderElectionDriverTest extends 
KubernetesHighAvailabilityTestB
 electionEventHandler.waitForError();
 final String errorMsg =
 "ConfigMap " + LEADER_CONFIGMAP_NAME + " 
does not exist.";
-
assertThat(electionEventHandler.getError()).isNotNull();
-
assertThatChainOfCauses(electionEventHandler.getError())
-.anySatisfy(t -> 
assertThat(t).hasMessageContaining(errorMsg));
+assertThat(electionEventHandler.getError())
+.isInstanceOf(KubernetesException.class)
+.hasMessage(errorMsg);
 });
 }
 };
@@ -136,9 +137,9 @@ class KubernetesLeaderElectionDriverTest extends 
KubernetesHighAvailabilityTestB