[flink] branch master updated: [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8119411addd [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished 8119411addd is described below commit 8119411addd9c82c15bab8480e7b35b8e6394d43 Author: Shammon FY AuthorDate: Mon Jun 19 10:17:19 2023 +0800 [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished Close apache/flink#22819 --- .../client/program/rest/RestClusterClientTest.java | 37 ++ .../operators/collect/CollectResultFetcher.java| 7 ++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 2ff6dea2a98..740eb06a57b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -122,6 +123,7 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -1136,6 +1138,32 @@ class RestClusterClientTest { } } +@Test +void testSendCoordinationRequestException() throws Exception { +final TestClientCoordinationHandler handler = +new TestClientCoordinationHandler(new FlinkJobNotFoundException(jobId)); +try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) { +try (RestClusterClient restClusterClient = + createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { +String payload = "testing payload"; +TestCoordinationRequest request = new TestCoordinationRequest<>(payload); + +assertThatThrownBy( +() -> +restClusterClient +.sendCoordinationRequest( +jobId, new OperatorID(), request) +.get()) +.matches( +e -> + ExceptionUtils.findThrowableWithMessage( +e, + FlinkJobNotFoundException.class.getName()) +.isPresent()); +} +} +} + /** * The SUSPENDED job status should never be returned by the client thus client retries until it * either receives a different job status or the cluster is not reachable. @@ -1166,9 +1194,15 @@ class RestClusterClientTest { ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> { +@Nullable private final FlinkJobNotFoundException exception; private TestClientCoordinationHandler() { +this(null); +} + +private TestClientCoordinationHandler(@Nullable FlinkJobNotFoundException exception) { super(ClientCoordinationHeaders.getInstance()); +this.exception = exception; } @Override @@ -1178,6 +1212,9 @@ class RestClusterClientTest { @Nonnull DispatcherGateway gateway) throws RestHandlerException { try { +if (exception != null) { +throw exception; +} TestCoordinationRequest req = (TestCoordinationRequest) request.getRequestBody() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java index 519b7d603f1..a7916502214 100644 ---
[flink] branch master updated (4586bcbe662 -> b462d0ec4d1)
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 4586bcbe662 [FLINK-27240][table] Support ADD PARTITION statement for partitioned table (#22711) new 7e42493b021 [hotfix] Enrich hybridPartitionDataConsumeConstraint when create AdaptiveBatchScheduler in scheduler benchmark utils. new b462d0ec4d1 [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../AllFinishedInputConsumableDecider.java | 5 +++-- .../strategy/DefaultInputConsumableDecider.java| 13 + .../scheduler/strategy/InputConsumableDecider.java | 10 +- .../PartialFinishedInputConsumableDecider.java | 5 +++-- .../strategy/VertexwiseSchedulingStrategy.java | 5 + .../benchmark/SchedulerBenchmarkUtils.java | 2 ++ .../DefaultInputConsumableDeciderTest.java | 22 ++ .../strategy/TestingInputConsumableDecider.java| 6 ++ 8 files changed, 63 insertions(+), 5 deletions(-)
[flink] 02/02: [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b462d0ec4d1d421a369e45f8dca33284b5be6bc2 Author: sunxia AuthorDate: Fri Jun 16 10:14:21 2023 +0800 [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler This close #22798. --- .../AllFinishedInputConsumableDecider.java | 5 +++-- .../strategy/DefaultInputConsumableDecider.java| 13 + .../scheduler/strategy/InputConsumableDecider.java | 10 +- .../PartialFinishedInputConsumableDecider.java | 5 +++-- .../strategy/VertexwiseSchedulingStrategy.java | 5 + .../DefaultInputConsumableDeciderTest.java | 22 ++ .../strategy/TestingInputConsumableDecider.java| 6 ++ 7 files changed, 61 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java index 6c23757a1a7..f8cbb260488 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java @@ -37,14 +37,15 @@ public class AllFinishedInputConsumableDecider implements InputConsumableDecider executionVertex.getConsumedPartitionGroups()) { if (!consumableStatusCache.computeIfAbsent( -consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { +consumedPartitionGroup, this::isConsumableBasedOnFinishedProducers)) { return false; } } return true; } -private boolean isConsumedPartitionGroupConsumable( +@Override +public boolean isConsumableBasedOnFinishedProducers( final ConsumedPartitionGroup consumedPartitionGroup) { return consumedPartitionGroup.getNumberOfUnfinishedPartitions() == 0; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java index 93db09c94b6..ccd354b0d0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java @@ -65,6 +65,19 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider { return true; } +@Override +public boolean isConsumableBasedOnFinishedProducers( +final ConsumedPartitionGroup consumedPartitionGroup) { +if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { +// For canBePipelined consumed partition group, whether it is consumable does not depend +// on task finish. To optimize performance and avoid unnecessary computation, we simply +// return false. +return false; +} else { +return consumedPartitionGroup.areAllPartitionsFinished(); +} +} + private boolean isConsumedPartitionGroupConsumable( final ConsumedPartitionGroup consumedPartitionGroup, final Set verticesToSchedule) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java index e34cb06bc4e..1d19dd2cf62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java @@ -24,7 +24,7 @@ import java.util.function.Function; /** * {@link InputConsumableDecider} is responsible for determining whether the input of an - * executionVertex is consumable. + * executionVertex or a consumed partition group is consumable. */ public interface InputConsumableDecider { /** @@ -41,6 +41,14 @@ public interface InputConsumableDecider { Set verticesToSchedule, Map consumableStatusCache); +/** + * Determining whether the consumed partition group is consumable based on finished producers. + * + * @param consumedPartitionGroup to be determined whether it is consumable. + */ +boolean isConsumableBasedOnFinishedProducers( +final ConsumedPartitionGroup consumedPartitionGroup); + /** Factory for {@link InputConsumableDecider}. */ interface Factory { InputConsumableDecider createInstance( diff
[flink] 01/02: [hotfix] Enrich hybridPartitionDataConsumeConstraint when create AdaptiveBatchScheduler in scheduler benchmark utils.
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7e42493b021e3e012dd429e89588a54d8b151dfd Author: sunxia AuthorDate: Thu Jun 15 18:12:45 2023 +0800 [hotfix] Enrich hybridPartitionDataConsumeConstraint when create AdaptiveBatchScheduler in scheduler benchmark utils. --- .../flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java index 2368e781ec4..a60b3fd584b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java @@ -135,6 +135,8 @@ public class SchedulerBenchmarkUtils { return schedulerBuilder .setVertexParallelismAndInputInfosDecider( createCustomParallelismDecider(jobConfiguration.getParallelism())) +.setHybridPartitionDataConsumeConstraint( + jobConfiguration.getHybridPartitionDataConsumeConstraint()) .setInputConsumableDeciderFactory( loadInputConsumableDeciderFactory( jobConfiguration.getHybridPartitionDataConsumeConstraint()))
[flink] branch release-1.17 updated: [BP-1.17][FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) (#22825)
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 3b5c1f7915e [BP-1.17][FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) (#22825) 3b5c1f7915e is described below commit 3b5c1f7915ecfe0f7e353fd342f8d22df5bcd7c4 Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com> AuthorDate: Tue Jun 20 09:48:08 2023 +0800 [BP-1.17][FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) (#22825) --- .../table/planner/plan/ExecNodeGraphInternalPlan.java | 1 + .../org/apache/flink/table/api/CompiledPlanITCase.java | 9 - .../apache/flink/table/planner/utils/TableTestBase.scala | 14 ++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java index 576426a63f3..71c9b13b04e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java @@ -79,6 +79,7 @@ public class ExecNodeGraphInternalPlan implements InternalPlan { file.toPath(), serializedPlan.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, +StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE); } catch (IOException e) { throw new TableException("Cannot write the compiled plan to file '" + file + "'.", e); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index 887d8a8d791..b5485097aa6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -33,6 +33,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -258,7 +259,8 @@ public class CompiledPlanITCase extends JsonPlanTestBase { TableResult tableResult = tableEnv.executeSql( String.format( -"COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", +"COMPILE PLAN '%s' FOR INSERT INTO sink " ++ "SELECT IF(a > b, a, b) AS a, b + 1 AS b, SUBSTR(c, 1, 4) AS c FROM src WHERE a > 10", planPath)); assertThat(tableResult).isEqualTo(TableResultInternal.TABLE_RESULT_OK); @@ -271,6 +273,11 @@ public class CompiledPlanITCase extends JsonPlanTestBase { "COMPILE PLAN '%s' FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src", planPath))) .isEqualTo(TableResultInternal.TABLE_RESULT_OK); +assertThat( +TableTestUtil.isValidJson( +FileUtils.readFileToString( +planPath.toFile(), StandardCharsets.UTF_8))) +.isTrue(); tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await(); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 0615b66b671..ef570f18156 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleType import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.BatchExecutionOptions import org.apache.flink.core.testutils.FlinkMatchers.containsMessage +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException import
[flink] branch release-1.16 updated (d4bdca1f76e -> 08bced4646c)
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from d4bdca1f76e [FLINK-32368][test] Fixes wrong interface implementation in KubernetesTestFixture add 08bced4646c [BP-1.16][FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) (#22826) No new revisions were added by this update. Summary of changes: .../table/planner/plan/ExecNodeGraphInternalPlan.java | 1 + .../org/apache/flink/table/api/CompiledPlanITCase.java | 9 - .../apache/flink/table/planner/utils/TableTestBase.scala | 14 ++ 3 files changed, 23 insertions(+), 1 deletion(-)
[flink] branch master updated: [FLINK-27240][table] Support ADD PARTITION statement for partitioned table (#22711)
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4586bcbe662 [FLINK-27240][table] Support ADD PARTITION statement for partitioned table (#22711) 4586bcbe662 is described below commit 4586bcbe6627ad91001838507b9e1e68ddfc7cdf Author: yuxia Luo AuthorDate: Tue Jun 20 09:02:28 2023 +0800 [FLINK-27240][table] Support ADD PARTITION statement for partitioned table (#22711) Co-authored-by: fengli --- .../src/main/codegen/data/Parser.tdd | 3 ++ .../src/main/codegen/includes/parserImpls.ftl | 42 .../flink/sql/parser/ddl/SqlAddPartitions.java | 7 +++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 19 .../operations/SqlNodeToOperationConversion.java | 15 -- .../SqlAlterTableAddPartitionConverter.java| 56 ++ .../operations/converters/SqlNodeConverters.java | 1 + .../operations/SqlDdlToOperationConverterTest.java | 28 +++ .../flink/table/api/TableEnvironmentTest.scala | 51 9 files changed, 207 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 27289112a3e..3fb6fe854f3 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -31,6 +31,8 @@ "org.apache.flink.sql.parser.ddl.resource.SqlResource" "org.apache.flink.sql.parser.ddl.resource.SqlResourceType" "org.apache.flink.sql.parser.ddl.SqlAddJar" +"org.apache.flink.sql.parser.ddl.SqlAddPartitions" + "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterTable" @@ -122,6 +124,7 @@ "org.apache.calcite.sql.SqlCreate" "org.apache.calcite.sql.SqlDrop" "java.util.ArrayList" +"java.util.Collections" "java.util.HashSet" "java.util.List" "java.util.Set" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 60cb2e038c9..25af4e20f7e 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -591,6 +591,7 @@ SqlAlterTable SqlAlterTable() : SqlIdentifier tableIdentifier; SqlIdentifier newTableIdentifier = null; boolean ifExists = false; +boolean ifNotExists = false; SqlNodeList propertyList = SqlNodeList.EMPTY; SqlNodeList propertyKeyList = SqlNodeList.EMPTY; SqlNodeList partitionSpec = null; @@ -599,6 +600,7 @@ SqlAlterTable SqlAlterTable() : SqlIdentifier originColumnIdentifier; SqlIdentifier newColumnIdentifier; AlterTableContext ctx = new AlterTableContext(); +AlterTableAddPartitionContext addPartitionCtx = new AlterTableAddPartitionContext(); } { { startPos = getPos(); } @@ -650,6 +652,16 @@ SqlAlterTable SqlAlterTable() : } | +( +AlterTableAddPartition(addPartitionCtx) +{ +return new SqlAddPartitions(startPos.plus(getPos()), +tableIdentifier, +addPartitionCtx.ifNotExists, +addPartitionCtx.partSpecs, +addPartitionCtx.partProps); +} +| ( AlterTableAddOrModify(ctx) | @@ -669,6 +681,7 @@ SqlAlterTable SqlAlterTable() : ctx.watermark, ifExists); } +) | ( @@ -923,6 +936,35 @@ SqlTableColumn RegularColumn(TableCreationContext context, SqlIdentifier name, S } } +/** Parses {@code ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec [PARTITION partition_spec][...]}. */ +void AlterTableAddPartition(AlterTableAddPartitionContext context) : +{ +List partSpecs = new ArrayList(); +List partProps = new ArrayList(); +SqlNodeList partSpec; +SqlNodeList partProp; +} +{ +context.ifNotExists = IfNotExistsOpt() +( + +{ +partSpec = new SqlNodeList(getPos()); +partProp = null; +PartitionSpecCommaList(partSpec); +} +[ { partProp = TableProperties(); } ] +{ +partSpecs.add(partSpec); +partProps.add(partProp); +} +)+ +{ +context.partSpecs = partSpecs; +context.partProps = partProps; +} +} + /** Parses {@code ALTER TABLE table_name ADD/MODIFY
[flink-connector-elasticsearch] branch v3.0 updated: [hotfix] Allow unmatched tests (#70)
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git The following commit(s) were added to refs/heads/v3.0 by this push: new 36409d2 [hotfix] Allow unmatched tests (#70) 36409d2 is described below commit 36409d2683de67558e5977b29dc21bd359a5afec Author: Sergey Nuyanzin AuthorDate: Mon Jun 19 13:36:25 2023 +0200 [hotfix] Allow unmatched tests (#70) --- .../src/test/resources/archunit.properties | 2 ++ flink-connector-elasticsearch6/src/test/resources/archunit.properties | 2 ++ flink-connector-elasticsearch7/src/test/resources/archunit.properties | 2 ++ 3 files changed, 6 insertions(+) diff --git a/flink-connector-elasticsearch-base/src/test/resources/archunit.properties b/flink-connector-elasticsearch-base/src/test/resources/archunit.properties index 15be88c..48011f9 100644 --- a/flink-connector-elasticsearch-base/src/test/resources/archunit.properties +++ b/flink-connector-elasticsearch-base/src/test/resources/archunit.properties @@ -29,3 +29,5 @@ freeze.store.default.allowStoreUpdate=true #freeze.refreeze=true freeze.store.default.path=archunit-violations + +archRule.failOnEmptyShould = false diff --git a/flink-connector-elasticsearch6/src/test/resources/archunit.properties b/flink-connector-elasticsearch6/src/test/resources/archunit.properties index 15be88c..48011f9 100644 --- a/flink-connector-elasticsearch6/src/test/resources/archunit.properties +++ b/flink-connector-elasticsearch6/src/test/resources/archunit.properties @@ -29,3 +29,5 @@ freeze.store.default.allowStoreUpdate=true #freeze.refreeze=true freeze.store.default.path=archunit-violations + +archRule.failOnEmptyShould = false diff --git a/flink-connector-elasticsearch7/src/test/resources/archunit.properties b/flink-connector-elasticsearch7/src/test/resources/archunit.properties index 15be88c..48011f9 100644 --- a/flink-connector-elasticsearch7/src/test/resources/archunit.properties +++ b/flink-connector-elasticsearch7/src/test/resources/archunit.properties @@ -29,3 +29,5 @@ freeze.store.default.allowStoreUpdate=true #freeze.refreeze=true freeze.store.default.path=archunit-violations + +archRule.failOnEmptyShould = false
[flink-kubernetes-operator] branch main updated: [FLINK-32334] Also check if no taskmanager are running while waiting for cluster shutdown
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 4d9615f5 [FLINK-32334] Also check if no taskmanager are running while waiting for cluster shutdown 4d9615f5 is described below commit 4d9615f5c76672c9b324ed8f4876d62af7fef60e Author: Nicolas Fraison <6404013+ashan...@users.noreply.github.com> AuthorDate: Mon Jun 19 11:57:53 2023 +0200 [FLINK-32334] Also check if no taskmanager are running while waiting for cluster shutdown --- .../operator/service/AbstractFlinkService.java | 12 - .../operator/service/NativeFlinkService.java | 6 + .../operator/service/StandaloneFlinkService.java | 9 +++ .../kubernetes/operator/TestingFlinkService.java | 5 .../service/StandaloneFlinkServiceTest.java| 31 -- 5 files changed, 60 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 90866abd..82aa0cc0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -157,6 +157,8 @@ public abstract class AbstractFlinkService implements FlinkService { protected abstract PodList getJmPodList(String namespace, String clusterId); +protected abstract PodList getTmPodList(String namespace, String clusterId); + protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception; @@ -819,6 +821,7 @@ public abstract class AbstractFlinkService implements FlinkService { LOG.info("Waiting for cluster shutdown..."); boolean jobManagerRunning = true; +boolean taskManagerRunning = true; boolean serviceRunning = true; for (int i = 0; i < shutdownTimeout; i++) { @@ -829,6 +832,13 @@ public abstract class AbstractFlinkService implements FlinkService { jobManagerRunning = false; } } +if (taskManagerRunning) { +PodList tmPodList = getTmPodList(namespace, clusterId); + +if (tmPodList.getItems().isEmpty()) { +taskManagerRunning = false; +} +} if (serviceRunning) { Service service = @@ -843,7 +853,7 @@ public abstract class AbstractFlinkService implements FlinkService { } } -if (!jobManagerRunning && !serviceRunning) { +if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) { break; } // log a message waiting to shutdown Flink cluster every 5 seconds. diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index 912a19e1..b1191e21 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -93,6 +93,12 @@ public class NativeFlinkService extends AbstractFlinkService { .list(); } +@Override +protected PodList getTmPodList(String namespace, String clusterId) { +// Native mode does not manage TaskManager +return new PodList(); +} + protected void submitClusterInternal(Configuration conf) throws Exception { LOG.info("Deploying session cluster"); final ClusterClientServiceLoader clusterClientServiceLoader = diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index bdc6b65f..77aa08b8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -90,6 +90,15 @@ public class StandaloneFlinkService extends AbstractFlinkService { .list(); } +@Override +protected PodList getTmPodList(String namespace, String clusterId) { +return kubernetesClient +.pods() +.inNamespace(namespace) +
[flink] branch master updated (1a3151b5b9f -> de0efcbe533)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 1a3151b5b9f [FLINK-32233][sql] Introduce SupportsTruncate interface (#22692) add de0efcbe533 [FLINK-31782][runtime] Makes DefaultLeaderElectionService implement MultipleComponentLeaderElectionDriver.Listener (#22640) No new revisions were added by this update. Summary of changes: .../DefaultLeaderElectionService.java | 30 +- 1 file changed, 29 insertions(+), 1 deletion(-)
[flink] branch master updated: [FLINK-32233][sql] Introduce SupportsTruncate interface (#22692)
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1a3151b5b9f [FLINK-32233][sql] Introduce SupportsTruncate interface (#22692) 1a3151b5b9f is described below commit 1a3151b5b9f1df8aaa221b2febfae0431a69b95d Author: yuxia Luo AuthorDate: Mon Jun 19 17:28:52 2023 +0800 [FLINK-32233][sql] Introduce SupportsTruncate interface (#22692) --- .../connector/sink/abilities/SupportsTruncate.java | 43 ++ 1 file changed, 43 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTruncate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTruncate.java new file mode 100644 index 000..6b5a43d06df --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTruncate.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +/** + * Enables to delete all existing data in a {@link DynamicTableSink} table using {@code TRUNCATE + * TABLE} statement. + * + * For {@code TRUNCATE TABLE} statement, if the corresponding {@link DynamicTableSink} have + * implemented this interface, then the method {@link #executeTruncation()} will be invoked in + * execution phase. Otherwise, Flink will throw exception directly. + */ +@PublicEvolving +public interface SupportsTruncate { + +/** + * Execute truncating table. + * + * Note: please remember to throw exception if the truncation hasn't been executed + * successfully, otherwise it'll be still considered to haven been executed successfully by + * Flink. + */ +void executeTruncation(); +}
[flink] branch master updated: [FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820)
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f69ed3454f2 [FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) f69ed3454f2 is described below commit f69ed3454f2ab200310edee230da292ee2408503 Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com> AuthorDate: Mon Jun 19 17:22:21 2023 +0800 [FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (#22820) --- .../table/planner/plan/ExecNodeGraphInternalPlan.java | 1 + .../org/apache/flink/table/api/CompiledPlanITCase.java | 9 - .../apache/flink/table/planner/utils/TableTestBase.scala | 14 ++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java index 576426a63f3..71c9b13b04e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java @@ -79,6 +79,7 @@ public class ExecNodeGraphInternalPlan implements InternalPlan { file.toPath(), serializedPlan.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, +StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE); } catch (IOException e) { throw new TableException("Cannot write the compiled plan to file '" + file + "'.", e); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index 887d8a8d791..b5485097aa6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -33,6 +33,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -258,7 +259,8 @@ public class CompiledPlanITCase extends JsonPlanTestBase { TableResult tableResult = tableEnv.executeSql( String.format( -"COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", +"COMPILE PLAN '%s' FOR INSERT INTO sink " ++ "SELECT IF(a > b, a, b) AS a, b + 1 AS b, SUBSTR(c, 1, 4) AS c FROM src WHERE a > 10", planPath)); assertThat(tableResult).isEqualTo(TableResultInternal.TABLE_RESULT_OK); @@ -271,6 +273,11 @@ public class CompiledPlanITCase extends JsonPlanTestBase { "COMPILE PLAN '%s' FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src", planPath))) .isEqualTo(TableResultInternal.TABLE_RESULT_OK); +assertThat( +TableTestUtil.isValidJson( +FileUtils.readFileToString( +planPath.toFile(), StandardCharsets.UTF_8))) +.isTrue(); tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await(); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 06f782d66c3..1086e65609c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleType import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.BatchExecutionOptions import org.apache.flink.core.testutils.FlinkMatchers.containsMessage +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode import
[flink] branch master updated: [FLINK-32271][build] Bump snappy-java to 1.1.10.1
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ba47237a0a4 [FLINK-32271][build] Bump snappy-java to 1.1.10.1 ba47237a0a4 is described below commit ba47237a0a44222a275ba7a1d144822466d20f15 Author: Ryan Skraba AuthorDate: Fri Jun 16 20:47:24 2023 +0200 [FLINK-32271][build] Bump snappy-java to 1.1.10.1 --- flink-dist/src/main/resources/META-INF/NOTICE | 2 +- .../flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE | 2 +- flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE | 2 +- flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE | 2 +- .../src/main/resources/META-INF/NOTICE | 2 +- pom.xml | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE index 7cbacd9e17c..5d9a217e153 100644 --- a/flink-dist/src/main/resources/META-INF/NOTICE +++ b/flink-dist/src/main/resources/META-INF/NOTICE @@ -19,7 +19,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.javassist:javassist:3.24.0-GA - org.lz4:lz4-java:1.8.0 - org.objenesis:objenesis:2.1 -- org.xerial.snappy:snappy-java:1.1.8.3 +- org.xerial.snappy:snappy-java:1.1.10.1 This project bundles the following dependencies under the BSD license. See bundled license files for details. diff --git a/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE index fa5e0b26f85..a7b54bcf948 100644 --- a/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE +++ b/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE @@ -31,7 +31,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.kerby:kerby-asn1:1.0.1 - org.apache.kerby:kerby-pkix:1.0.1 - org.apache.kerby:kerby-util:1.0.1 -- org.xerial.snappy:snappy-java:1.1.8.3 +- org.xerial.snappy:snappy-java:1.1.10.1 This project bundles the following dependencies under the MIT (https://opensource.org/licenses/MIT) diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE index 8f0f179f326..fa67a18717e 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE @@ -41,7 +41,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.kerby:kerby-pkix:1.0.1 - org.apache.kerby:kerby-util:1.0.1 - org.wildfly.openssl:wildfly-openssl:1.0.7.Final -- org.xerial.snappy:snappy-java:1.1.8.3 +- org.xerial.snappy:snappy-java:1.1.10.1 - software.amazon.ion:ion-java:1.0.2 This project bundles the following dependencies under BSD-2 License (https://opensource.org/licenses/BSD-2-Clause). diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE index dc42b334706..ab442ff671f 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE +++ b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE @@ -55,7 +55,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.kerby:kerby-util:1.0.1 - org.weakref:jmxutils:1.19 - org.wildfly.openssl:wildfly-openssl:1.0.7.Final -- org.xerial.snappy:snappy-java:1.1.8.3 +- org.xerial.snappy:snappy-java:1.1.10.1 - software.amazon.ion:ion-java:1.0.2 This project bundles the following dependencies under BSD-2 License (https://opensource.org/licenses/BSD-2-Clause). diff --git a/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE b/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE index 6f6c6786a8d..fb62caf7ea9 100644 --- a/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE +++ b/flink-formats/flink-sql-avro-confluent-registry/src/main/resources/META-INF/NOTICE @@ -17,7 +17,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.commons:commons-compress:1.21 - org.apache.kafka:kafka-clients:7.2.2-ccs - org.glassfish.jersey.core:jersey-common:2.30 -- org.xerial.snappy:snappy-java:1.1.8.3 +- org.xerial.snappy:snappy-java:1.1.10.1 The binary distribution of this product bundles these dependencies under the Eclipse Public License - v 2.0 (https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.txt) diff
[flink] branch master updated: [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717)
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6e22b94e708 [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717) 6e22b94e708 is described below commit 6e22b94e708052ac450f84bce954a4d3ec7bb772 Author: Hanyu Zheng <135176127+hanyuzhe...@users.noreply.github.com> AuthorDate: Mon Jun 19 01:09:20 2023 -0700 [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717) --- docs/data/sql_functions.yml| 3 + docs/data/sql_functions_zh.yml | 3 + .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 9 +++ .../flink/table/api/internal/BaseExpressions.java | 35 .../functions/BuiltInFunctionDefinitions.java | 10 +++ .../table/types/inference/InputTypeStrategies.java | 8 ++ .../functions/CollectionFunctionsITCase.java | 94 +- .../functions/scalar/ArrayConcatFunction.java | 79 ++ 9 files changed, 241 insertions(+), 1 deletion(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 12b032351f9..87c8b0f9486 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -646,6 +646,9 @@ collection: - sql: ARRAY_UNION(array1, array2) table: haystack.arrayUnion(array) description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null. + - sql: ARRAY_CONCAT(array1, ...) +table: array1.arrayConcat(...) +description: Returns an array that is the result of concatenating at least one array. This array contains all the elements in the first array, followed by all the elements in the second array, and so forth, up to the Nth array. If any input array is NULL, the function returns NULL. - sql: MAP_KEYS(map) table: MAP.mapKeys() description: Returns the keys of the map as array. No order guaranteed. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index efb16a2c633..f5249408a6a 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -724,6 +724,9 @@ collection: - sql: CARDINALITY(map) table: MAP.cardinality() description: 返回 map 中的 entries 数量。 + - sql: ARRAY_CONCAT(array1, ...) +table: array1.arrayConcat(...) +description: 返回一个数组,该数组是连接至少一个数组的结果。该数组包含第一个数组中的所有元素,然后是第二个数组中的所有元素,依此类推,直到第 N 个数组。如果任何输入数组为 NULL,则函数返回 NULL。 - sql: map ‘[’ value ‘]’ table: MAP.at(ANY) description: 返回 map 中指定 key 对应的值。 diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 86e93fefccf..0f9ac9bdc1d 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -225,6 +225,7 @@ advanced type helper functions Expression.at Expression.cardinality Expression.element +Expression.array_concat Expression.array_contains Expression.array_distinct Expression.array_position diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 52967d542c5..78984fd5d32 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1519,6 +1519,15 @@ class Expression(Generic[T]): """ return _binary_op("arrayUnion")(self, array) +def array_concat(self, *arrays) -> 'Expression': +""" +Returns an array that is the result of concatenating at least one array. +This array contains all the elements in the first array, followed by all +the elements in the second array, and so forth, up to the Nth array. +If any input array is NULL, the function returns NULL. +""" +return _binary_op("arrayConcat")(self, *arrays) + @property def map_keys(self) -> 'Expression': """ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index b308c049b19..5caefd52b2d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -53,6 +53,7 @@ import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
[flink-connector-elasticsearch] branch v3.0 updated: [FLINK-32357] Elasticsearch v3.0 won't compile when testing against Flink 1.17.1
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git The following commit(s) were added to refs/heads/v3.0 by this push: new 9876d39 [FLINK-32357] Elasticsearch v3.0 won't compile when testing against Flink 1.17.1 9876d39 is described below commit 9876d39269b36b2a47f819541fb10e774e573e09 Author: Andriy Redko AuthorDate: Mon Jun 19 03:44:03 2023 -0400 [FLINK-32357] Elasticsearch v3.0 won't compile when testing against Flink 1.17.1 Signed-off-by: Andriy Redko --- pom.xml | 18 +- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/pom.xml b/pom.xml index 5b088d1..c6ecfa7 100644 --- a/pom.xml +++ b/pom.xml @@ -57,12 +57,11 @@ under the License. 4.13.2 5.8.1 3.21.0 - 0.22.0 1.17.2 2.21.0 false - 1.15.0 + 3.0.0-1.16.0 1.7.36 2.17.2 @@ -337,21 +336,6 @@ under the License. pom import - - - com.tngtech.archunit - archunit - ${archunit.version} - test - - - - com.tngtech.archunit - archunit-junit5 - ${archunit.version} - test - -
[flink] branch master updated (fa94fb5a027 -> 1ca19805ad3)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from fa94fb5a027 [FLINK-31721][core] Move JobStatusHook to flink-core module add 1ca19805ad3 [hotfix][state/changelog] Fix the description of ChangelogCompatibilityITCase No new revisions were added by this update. Summary of changes: .../org/apache/flink/test/state/ChangelogCompatibilityITCase.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-)
[flink] branch release-1.16 updated (f9394025fb7 -> d4bdca1f76e)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from f9394025fb7 [FLINK-32136][python] Fix the issue that Pyflink gateway server launch fails when purelib != platlib add d4bdca1f76e [FLINK-32368][test] Fixes wrong interface implementation in KubernetesTestFixture No new revisions were added by this update. Summary of changes: .../KubernetesCheckpointIDCounterTest.java | 4 ++-- .../KubernetesLeaderElectionDriverTest.java| 27 +++--- .../KubernetesStateHandleStoreTest.java| 6 ++--- .../highavailability/KubernetesTestFixture.java| 5 ++-- 4 files changed, 21 insertions(+), 21 deletions(-)
[flink] branch release-1.17 updated: [FLINK-32368][test] Fixes wrong interface implementation in KubernetesTestFixture
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new c359fe884b6 [FLINK-32368][test] Fixes wrong interface implementation in KubernetesTestFixture c359fe884b6 is described below commit c359fe884b6e23f47f7904c8be0d3081b05ebd48 Author: Matthias Pohl AuthorDate: Fri Jun 16 14:43:15 2023 +0200 [FLINK-32368][test] Fixes wrong interface implementation in KubernetesTestFixture Signed-off-by: Matthias Pohl --- .../KubernetesCheckpointIDCounterTest.java | 4 ++-- .../KubernetesLeaderElectionDriverTest.java| 27 +++--- .../KubernetesStateHandleStoreTest.java| 6 ++--- .../highavailability/KubernetesTestFixture.java| 5 ++-- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java index 9e206353e17..953cd7522dc 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java @@ -25,7 +25,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.junit.jupiter.api.Test; -import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -159,7 +159,7 @@ class KubernetesCheckpointIDCounterTest extends KubernetesHighAvailabilityTestBa checkpointIDCounter .shutdown(JobStatus.FINISHED) .get()) - .satisfies(anyCauseMatches(CompletionException.class)); + .satisfies(anyCauseMatches(ExecutionException.class)); // fixing the internal issue should make the shutdown succeed again KubernetesUtils.createConfigMapIfItDoesNotExist( diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java index a7654938cd0..af33b03779e 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java @@ -20,6 +20,8 @@ package org.apache.flink.kubernetes.highavailability; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException; +import org.apache.flink.runtime.leaderelection.LeaderElectionException; import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.junit.jupiter.api.Test; @@ -28,7 +30,6 @@ import java.util.Collections; import java.util.Map; import java.util.UUID; -import static org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY; import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY; @@ -91,9 +92,9 @@ class KubernetesLeaderElectionDriverTest extends KubernetesHighAvailabilityTestB electionEventHandler.waitForError(); final String errorMsg = "ConfigMap " + LEADER_CONFIGMAP_NAME + " does not exist."; - assertThat(electionEventHandler.getError()).isNotNull(); - assertThatChainOfCauses(electionEventHandler.getError()) -.anySatisfy(t -> assertThat(t).hasMessageContaining(errorMsg)); +assertThat(electionEventHandler.getError()) +.isInstanceOf(KubernetesException.class) +.hasMessage(errorMsg); }); } }; @@ -136,9 +137,9 @@ class KubernetesLeaderElectionDriverTest extends KubernetesHighAvailabilityTestB