(flink) branch release-1.17 updated: [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 2dd1076dbbf [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier 2dd1076dbbf is described below commit 2dd1076dbbf0f47e4b5d15e67e0265679397f93e Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Fri Dec 1 16:04:36 2023 +0800 [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier --- .../streaming/runtime/io/RecordWriterOutput.java | 4 +- .../runtime/io/RecordWriterOutputTest.java | 112 + 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 93acf23d30b..478fe4e3255 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -149,9 +149,7 @@ public class RecordWriterOutput implements WatermarkGaugeExposingOutputhttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordWriterOutput}. */ +class RecordWriterOutputTest { + +@ParameterizedTest +@ValueSource(booleans = {true, false}) +void testDisableUnalignedCheckpoint(boolean supportsUnalignedCheckpoints) throws IOException { +Queue> queue = new LinkedList<>(); + +RecordWriter>> task1 = +new RecordWriterBuilder>>() +.build( +new MockResultPartitionWriter() { +@Override +public void broadcastEvent( +AbstractEvent event, boolean isPriorityEvent) { +queue.add(Tuple2.of(event, isPriorityEvent)); +} +}); + +RecordWriterOutput writerOutput = +new RecordWriterOutput<>( +task1, LongSerializer.INSTANCE, null, supportsUnalignedCheckpoints); + +// Test unalignedBarrier +CheckpointBarrier unalignedBarrier = +new CheckpointBarrier( +0, +1L, +CheckpointOptions.unaligned( +CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault())); + +writerOutput.broadcastEvent(unalignedBarrier, true); +assertAlignmentTypeAndIsPriorityEvent( +queue.poll(), +supportsUnalignedCheckpoints +? AlignmentType.UNALIGNED +: AlignmentType.FORCED_ALIGNED, +supportsUnalignedCheckpoints); + +// Test alignedTimeoutBarrier +CheckpointBarrier alignedTimeoutBarrier = +new CheckpointBarrier( +0, +1L, +CheckpointOptions.alignedWithTimeout( +
(flink) branch release-1.18 updated: [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 3884f65a3d0 [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier 3884f65a3d0 is described below commit 3884f65a3d049acd5f092d5b48a1d02ff24b77e6 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Fri Dec 1 16:04:36 2023 +0800 [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier --- .../streaming/runtime/io/RecordWriterOutput.java | 4 +- .../runtime/io/RecordWriterOutputTest.java | 112 + 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 44e41e92197..43b2d8400b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -182,9 +182,7 @@ public class RecordWriterOutput } public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { -if (isPriorityEvent -&& event instanceof CheckpointBarrier -&& !supportsUnalignedCheckpoints) { +if (event instanceof CheckpointBarrier && !supportsUnalignedCheckpoints) { final CheckpointBarrier barrier = (CheckpointBarrier) event; event = barrier.withOptions(barrier.getCheckpointOptions().withUnalignedUnsupported()); isPriorityEvent = false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java new file mode 100644 index 000..d1bf139629e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordWriterOutput}. */ +class RecordWriterOutputTest { + +@ParameterizedTest +@ValueSource(booleans = {true, false}) +void testDisableUnalignedCheckpoint(boolean supportsUnalignedCheckpoints) throws IOException { +Queue> queue = new LinkedList<>(); + +RecordWriter>> task1 = +new RecordWriterBuilder>>() +.build( +new MockResultPartitionWriter() { +@Override +public void broadcastEvent( +AbstractEvent event, boolean isPriorityEvent) { +
(flink) 01/02: [FLINK-33480] Implement restore tests for GroupAggregate node
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 193b1c68976cdfbd66147278f23d7d427d9b5562 Author: bvarghese1 AuthorDate: Tue Nov 7 17:03:53 2023 -0800 [FLINK-33480] Implement restore tests for GroupAggregate node --- .../exec/stream/GroupAggregateRestoreTest.java | 46 ++ .../exec/stream/GroupAggregateTestPrograms.java| 374 + .../plan/nodes/exec/testutils/RestoreTestBase.java | 1 + .../plan/group-aggregate-distinct-mini-batch.json | 621 + .../savepoint/_metadata| Bin 0 -> 25689 bytes .../plan/group-aggregate-distinct.json | 359 .../group-aggregate-distinct/savepoint/_metadata | Bin 0 -> 27875 bytes .../plan/group-aggregate-simple-mini-batch.json| 351 .../savepoint/_metadata| Bin 0 -> 9479 bytes .../plan/group-aggregate-simple.json | 272 + .../group-aggregate-simple/savepoint/_metadata | Bin 0 -> 10954 bytes .../group-aggregate-udf-with-merge-mini-batch.json | 400 + .../savepoint/_metadata| Bin 0 -> 17762 bytes .../plan/group-aggregate-udf-with-merge.json | 211 +++ .../savepoint/_metadata| Bin 0 -> 19881 bytes ...oup-aggregate-udf-without-merge-mini-batch.json | 254 + .../savepoint/_metadata| Bin 0 -> 12731 bytes .../plan/group-aggregate-udf-without-merge.json| 231 .../savepoint/_metadata| Bin 0 -> 14618 bytes 19 files changed, 3120 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java new file mode 100644 index 000..2c5043d9f4d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecGroupAggregate}. */ +public class GroupAggregateRestoreTest extends RestoreTestBase { + +public GroupAggregateRestoreTest() { +super(StreamExecGroupAggregate.class); +} + +@Override +public List programs() { +return Arrays.asList( +GroupAggregateTestPrograms.GROUP_BY_SIMPLE, +GroupAggregateTestPrograms.GROUP_BY_SIMPLE_MINI_BATCH, +GroupAggregateTestPrograms.GROUP_BY_DISTINCT, +GroupAggregateTestPrograms.GROUP_BY_DISTINCT_MINI_BATCH, +GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE, +GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE_MINI_BATCH, +GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE, + GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE_MINI_BATCH); +} +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java new file mode 100644 index 000..b6a61a72144 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you
(flink) branch master updated (f751a00fd6f -> fac3ac78667)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f751a00fd6f [FLINK-33758] Implement restore tests for TemporalSort node (#23879) new 193b1c68976 [FLINK-33480] Implement restore tests for GroupAggregate node new fac3ac78667 [FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../exec/stream/GroupAggregateJsonPlanTest.java| 182 -- .../exec/stream/GroupAggregateRestoreTest.java | 46 +++ .../exec/stream/GroupAggregateTestPrograms.java| 374 +++ .../plan/nodes/exec/testutils/RestoreTestBase.java | 1 + .../jsonplan/GroupAggregateJsonPlanITCase.java | 205 --- ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 314 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 402 - ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 304 .../plan/group-aggregate-distinct-mini-batch.json} | 220 +-- .../savepoint/_metadata| Bin 0 -> 25689 bytes .../plan/group-aggregate-distinct.json}| 176 - .../group-aggregate-distinct/savepoint/_metadata | Bin 0 -> 27875 bytes .../plan/group-aggregate-simple-mini-batch.json} | 196 +- .../savepoint/_metadata| Bin 0 -> 9479 bytes .../plan/group-aggregate-simple.json} | 134 +++ .../group-aggregate-simple/savepoint/_metadata | Bin 0 -> 10954 bytes .../group-aggregate-udf-with-merge-mini-batch.json | 400 .../savepoint/_metadata| Bin 0 -> 17762 bytes .../plan/group-aggregate-udf-with-merge.json | 211 +++ .../savepoint/_metadata| Bin 0 -> 19881 bytes ...up-aggregate-udf-without-merge-mini-batch.json} | 225 +--- .../savepoint/_metadata| Bin 0 -> 12731 bytes .../plan/group-aggregate-udf-without-merge.json} | 208 --- .../savepoint/_metadata| Bin 0 -> 14618 bytes 24 files changed, 1571 insertions(+), 2027 deletions(-) delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out rename flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out => restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/plan/group-aggregate-distinct-mini-batch.json} (78%) create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/savepoint/_metadata rename flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out => restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/plan/group-aggregate-distinct.json} (67%) create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/savepoint/_metadata rename flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out =>
(flink) 02/02: [FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fac3ac786674f9b6ce5716902e74b1533ccb1c0a Author: bvarghese1 AuthorDate: Tue Nov 7 17:05:20 2023 -0800 [FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase --- .../exec/stream/GroupAggregateJsonPlanTest.java| 182 -- .../jsonplan/GroupAggregateJsonPlanITCase.java | 205 --- ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 345 ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 607 - ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 288 -- ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 367 - ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 314 --- ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 402 -- ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 281 -- ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 304 --- 10 files changed, 3295 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java deleted file mode 100644 index e655a3527ea..000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.CountDistinct; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - -/** Test json serialization/deserialization for group aggregate. */ -@ExtendWith(ParameterizedTestExtension.class) -class GroupAggregateJsonPlanTest extends TableTestBase { - -@Parameter private boolean isMiniBatchEnabled; - -private StreamTableTestUtil util; -private TableEnvironment tEnv; - -@Parameters(name = "isMiniBatchEnabled={0}") -private static List testData() { -return Arrays.asList(true, false); -} - -@BeforeEach -void setup() { -util = streamTestUtil(TableConfig.getDefault()); -tEnv = util.getTableEnv(); -if (isMiniBatchEnabled) { -tEnv.getConfig() -.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) -.set( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, -Duration.ofSeconds(10)) -.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L); -} else { - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false); -} - -String srcTableDdl = -"CREATE TABLE MyTable (\n" -+ " a bigint,\n" -+ " b int not null,\n" -+ " c varchar,\n" -+
(flink) branch master updated: [FLINK-33758] Implement restore tests for TemporalSort node (#23879)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f751a00fd6f [FLINK-33758] Implement restore tests for TemporalSort node (#23879) f751a00fd6f is described below commit f751a00fd6f0e70187d2a9ae2ccd6a728d9a2c64 Author: James Hughes AuthorDate: Thu Dec 7 07:51:40 2023 -0500 [FLINK-33758] Implement restore tests for TemporalSort node (#23879) --- .../exec/stream/TemporalSortJsonPlanTest.java | 68 .../nodes/exec/stream/TemporalSortRestoreTest.java | 40 + .../exec/stream/TemporalSortTestPrograms.java | 96 +++ .../stream/jsonplan/TemporalSortJsonITCase.java| 87 -- .../plan/temporal-sort-proctime.json} | 180 +++-- .../temporal-sort-proctime/savepoint/_metadata | Bin 0 -> 6037 bytes .../plan/temporal-sort-rowtime.json} | 144 +++-- .../temporal-sort-rowtime/savepoint/_metadata | Bin 0 -> 10729 bytes 8 files changed, 210 insertions(+), 405 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java deleted file mode 100644 index 8ebd6a47e59..000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for temporal sort. */ -class TemporalSortJsonPlanTest extends TableTestBase { - -private StreamTableTestUtil util; - -@BeforeEach -void setup() { -util = streamTestUtil(TableConfig.getDefault()); -TableEnvironment tEnv = util.getTableEnv(); - -String srcTableDdl = -"CREATE TABLE MyTable (\n" -+ " a INT,\n" -+ " b BIGINT,\n" -+ " c VARCHAR,\n" -+ " `rowtime` AS TO_TIMESTAMP(c),\n" -+ " proctime as PROCTIME(),\n" -+ " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" -+ ") WITH (\n" -+ " 'connector' = 'values')\n"; -tEnv.executeSql(srcTableDdl); - -String sinkTableDdl = -"CREATE TABLE MySink (\n" -+ " a INT\n" -+ ") WITH (\n" -+ " 'connector' = 'values')\n"; -tEnv.executeSql(sinkTableDdl); -} - -@Test -void testSortProcessingTime() { -util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by proctime, c"); -} - -@Test -void testSortRowTime() { -util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by rowtime, c"); -} -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java new file mode 100644 index 000..f6a1b0fcea9 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + *
(flink-kubernetes-operator) branch main updated: [Flink 31966] Flink Kubernetes operator lacks TLS support
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 30566ed7 [Flink 31966] Flink Kubernetes operator lacks TLS support 30566ed7 is described below commit 30566ed7390a4fce1dc3e239efe3c519c55aa9b1 Author: Tony Garrard AuthorDate: Thu Dec 7 12:16:39 2023 + [Flink 31966] Flink Kubernetes operator lacks TLS support Signed-off-by: A. Garrard --- docs/content/docs/operations/helm.md | 4 + examples/README.md | 5 + .../basic-secure-deployment-only.yaml | 49 .../basic-secure-session-job-only.yaml | 42 +++ examples/flink-tls-example/basic-secure.yaml | 47 +++ examples/flink-tls-example/pre-install.yaml| 106 +++ .../operator/service/AbstractFlinkService.java | 68 - .../operator/service/NativeFlinkService.java | 2 +- .../operator/service/StandaloneFlinkService.java | 2 +- .../flink/kubernetes/operator/utils/EnvUtils.java | 3 + .../kubernetes/operator/TestingFlinkService.java | 2 +- .../operator/service/AbstractFlinkServiceTest.java | 2 +- .../operator/service/SecureFlinkServiceTest.java | 319 + .../src/test/resources/keystore.jks| Bin 0 -> 3018 bytes .../src/test/resources/truststore.jks | Bin 0 -> 861 bytes .../templates/flink-operator.yaml | 21 ++ helm/flink-kubernetes-operator/values.yaml | 8 + 17 files changed, 671 insertions(+), 9 deletions(-) diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md index bb84a2b5..dee6efd0 100644 --- a/docs/content/docs/operations/helm.md +++ b/docs/content/docs/operations/helm.md @@ -111,6 +111,10 @@ The configurable parameters of the Helm chart and which default values as detail | operatorHealth.livenessProbe | Liveness probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | [...] | operatorHealth.startupProbe| Startup probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | [...] | postStart | The postStart hook configuration for the main container. | [...] +| tls.create | Whether to mount an optional secret containing a tls truststore for the flink-kubernetes-operator. | false [...] +| tls.secretName | The name of the tls secret | flink-operator-cert [...] +| tls.secretKeyRef.name | The name of the secret containing the password for the java keystore/truststore | operator-certificate-password
(flink-kubernetes-operator) branch main updated: Add Java client library update process to the Upgrade page (#723)
This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 98ad5b82 Add Java client library update process to the Upgrade page (#723) 98ad5b82 is described below commit 98ad5b822b999c3079873993d093af8a77ed4002 Author: Maximilian Michels AuthorDate: Thu Dec 7 12:16:40 2023 +0100 Add Java client library update process to the Upgrade page (#723) --- docs/content/docs/operations/upgrade.md | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/content/docs/operations/upgrade.md b/docs/content/docs/operations/upgrade.md index 3b570a39..1bb3d350 100644 --- a/docs/content/docs/operations/upgrade.md +++ b/docs/content/docs/operations/upgrade.md @@ -43,7 +43,13 @@ If you are upgrading from `kubernetes-operator-1.0.0` or later, please refer to We will cover these steps in detail in the next sections. -### 1. Upgrading the CRD +### 1. Upgrading the Java client library + +If you use the Flink Kubernetes operator Java client library, you need to update it first to ensure that responses from +the new operator version can be parsed properly. For minor releases, the new version of the Java library is +backwards-compatible with the previous minor version of the operator. + +### 2. Upgrading the CRD The first step of the upgrade process is upgrading the CRDs for `FlinkDeployment` and `FlinkSessionJob` resources. This step must be completed manually and is not part of the helm installation logic. @@ -57,7 +63,7 @@ kubectl replace -f helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.ap Please note that we are using the `replace` command here which ensures that running deployments are unaffected. {{< /hint >}} -### 2. Upgrading the Helm deployment +### 3. Upgrading the Helm deployment {{< hint danger >}} Before upgrading, please compare the version difference between the currently generated yaml and the running yaml, which will be used for backup and restore.
(flink) branch master updated: [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a4d14ec224b [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier a4d14ec224b is described below commit a4d14ec224bbaf498ca2e912ff51539872a34878 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Fri Dec 1 16:04:36 2023 +0800 [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier --- .../streaming/runtime/io/RecordWriterOutput.java | 4 +- .../runtime/io/RecordWriterOutputTest.java | 112 + 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 44e41e92197..43b2d8400b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -182,9 +182,7 @@ public class RecordWriterOutput } public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { -if (isPriorityEvent -&& event instanceof CheckpointBarrier -&& !supportsUnalignedCheckpoints) { +if (event instanceof CheckpointBarrier && !supportsUnalignedCheckpoints) { final CheckpointBarrier barrier = (CheckpointBarrier) event; event = barrier.withOptions(barrier.getCheckpointOptions().withUnalignedUnsupported()); isPriorityEvent = false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java new file mode 100644 index 000..d1bf139629e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordWriterOutput}. */ +class RecordWriterOutputTest { + +@ParameterizedTest +@ValueSource(booleans = {true, false}) +void testDisableUnalignedCheckpoint(boolean supportsUnalignedCheckpoints) throws IOException { +Queue> queue = new LinkedList<>(); + +RecordWriter>> task1 = +new RecordWriterBuilder>>() +.build( +new MockResultPartitionWriter() { +@Override +public void broadcastEvent( +AbstractEvent event, boolean isPriorityEvent) { +
(flink-connector-shared-utils) branch parent_pom updated: [FLINK-32894] Use 3.5.1 for maven-shade-plugin to support Java 17/21
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch parent_pom in repository https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git The following commit(s) were added to refs/heads/parent_pom by this push: new 7c2745a [FLINK-32894] Use 3.5.1 for maven-shade-plugin to support Java 17/21 7c2745a is described below commit 7c2745af777c6681b9eb14f0b05b2136fb141784 Author: Qingsheng Ren AuthorDate: Thu Dec 7 17:50:32 2023 +0800 [FLINK-32894] Use 3.5.1 for maven-shade-plugin to support Java 17/21 --- pom.xml | 7 +-- 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index fb93bac..3afa1b3 100644 --- a/pom.xml +++ b/pom.xml @@ -437,7 +437,7 @@ under the License. org.apache.maven.plugins maven-shade-plugin -3.1.1 +3.5.1 @@ -877,11 +877,6 @@ under the License. - -org.apache.maven.plugins -maven-shade-plugin -3.2.4 - io.github.zentol.japicmp japicmp-maven-plugin
(flink-connector-shared-utils) branch ci_utils updated: [FLINK-33556][CI] Test infrastructure for externalized python code
This is an automated email from the ASF dual-hosted git repository. gaborgsomogyi pushed a commit to branch ci_utils in repository https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git The following commit(s) were added to refs/heads/ci_utils by this push: new 7691962 [FLINK-33556][CI] Test infrastructure for externalized python code 7691962 is described below commit 7691962b8031536a2baa4d39252a38198ba91dc5 Author: pvary AuthorDate: Thu Dec 7 10:33:52 2023 +0100 [FLINK-33556][CI] Test infrastructure for externalized python code --- .github/workflows/python_ci.yml | 86 python/README.md| 6 + python/build-wheels.sh | 52 +++ python/glibc_version_fix.h | 17 + python/install_command.sh | 31 ++ python/lint-python.sh | 876 6 files changed, 1068 insertions(+) diff --git a/.github/workflows/python_ci.yml b/.github/workflows/python_ci.yml new file mode 100644 index 000..89e10d9 --- /dev/null +++ b/.github/workflows/python_ci.yml @@ -0,0 +1,86 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +on: + workflow_call: +inputs: + flink_version: +description: "Flink version to test against." +required: true +type: string + timeout_global: +description: "The timeout in minutes for the entire workflow." +required: false +type: number +default: 80 + timeout_test: +description: "The timeout in minutes for the test compile" +required: false +type: number +default: 50 + connector_branch: +description: "Branch that need to be checked out" +required: false +type: string + +jobs: + python_test: +runs-on: ubuntu-latest +timeout-minutes: ${{ inputs.timeout_global }} +env: + MVN_COMMON_OPTIONS: -U -B --no-transfer-progress -Dflink.version=${{ inputs.flink_version }} -DskipTests + MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out" +steps: + - run: echo "Running CI pipeline for JDK version 8" + + - name: Check out repository code +uses: actions/checkout@v3 +with: + ref: "${{ inputs.connector_branch }}" + + - name: Set JDK +uses: actions/setup-java@v3 +with: + java-version: 8 + distribution: 'temurin' + cache: 'maven' + + - name: Set Maven 3.8.6 +uses: stCarolas/setup-maven@v4.5 +with: + maven-version: 3.8.6 + + - name: Compile +timeout-minutes: ${{ inputs.timeout_test }} +run: | + set -o pipefail + + mvn clean install ${MVN_COMMON_OPTIONS} \ +${{ env.MVN_CONNECTION_OPTIONS }} \ +-Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties \ +| tee ${{ env.MVN_BUILD_OUTPUT_FILE }} + + - name: Run Python test +timeout-minutes: ${{ inputs.timeout_test }} +run: | + set -o pipefail + + cd flink-python + chmod a+x dev/* + ./dev/lint-python.sh -e mypy,sphinx | tee ${{ env.MVN_BUILD_OUTPUT_FILE }} diff --git a/python/README.md b/python/README.md new file mode 100644 index 000..0dc8217 --- /dev/null +++ b/python/README.md @@ -0,0 +1,6 @@ +This directory contains commonly used scripts for testing and creating python packages for connectors. + +The original version of the files are based on +https://github.com/apache/flink/tree/release-1.17.2/flink-python/dev. + +Created FLINK-33762 to make these scripts versioned to allow backward incompatible changes in the future. diff --git a/python/build-wheels.sh b/python/build-wheels.sh new file mode 100755 index 000..7f18a91 --- /dev/null +++ b/python/build-wheels.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license
(flink) branch master updated (e4f389895d9 -> ca1c7ce4812)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from e4f389895d9 [FLINK-33556] Test infrastructure for externalized python code add ca1c7ce4812 [FLINK-33666][table] Use the same constraint name in MergeTableLikeUtil and Schema No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/table/sql/show.md | 4 +- docs/content/docs/dev/table/sql/show.md| 4 +- .../src/test/resources/sql/table.q | 82 +++--- .../src/test/resources/sql/table.q | 8 +-- .../planner/operations/MergeTableLikeUtil.java | 7 +- .../planner/operations/MergeTableLikeUtilTest.java | 4 +- .../testChangelogSource.out| 2 +- .../testUpsertSource.out | 2 +- ...WithNonDeterministicFuncSinkWithDifferentPk.out | 4 +- .../testJoinTemporalFunction.out | 2 +- .../testTemporalTableJoin.out | 2 +- .../plan/deduplicate-asc-proctime.json | 4 +- .../deduplicate-asc/plan/deduplicate-asc.json | 4 +- .../deduplicate-desc/plan/deduplicate-desc.json| 4 +- .../stream-exec-expand_1/expand/plan/expand.json | 4 +- 15 files changed, 71 insertions(+), 66 deletions(-)
(flink) branch master updated (4eb5b588e4d -> e4f389895d9)
This is an automated email from the ASF dual-hosted git repository. gaborgsomogyi pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 4eb5b588e4d [FLINK-33726][sql-client] print time cost for streaming queries add e4f389895d9 [FLINK-33556] Test infrastructure for externalized python code No new revisions were added by this update. Summary of changes: flink-python/pyflink/pyflink_gateway_server.py | 17 +++-- 1 file changed, 11 insertions(+), 6 deletions(-)
(flink-kubernetes-operator) branch main updated: [FLINK-33645] Taskmanager env vars in config not given to taskmanager
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new e4528260 [FLINK-33645] Taskmanager env vars in config not given to taskmanager e4528260 is described below commit e4528260be78dabeb6552afda10e78802021984d Author: Tony Garrard AuthorDate: Thu Dec 7 08:11:48 2023 + [FLINK-33645] Taskmanager env vars in config not given to taskmanager Signed-off-by: A. Garrard --- .../StandaloneKubernetesTaskManagerParameters.java | 8 ++-- .../operator/kubeclient/utils/TestUtils.java | 5 + .../KubernetesStandaloneClusterDescriptorTest.java | 22 ++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java index de313f27..2517b756 100644 --- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java @@ -19,6 +19,8 @@ package org.apache.flink.kubernetes.operator.kubeclient.parameters; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; @@ -73,8 +75,10 @@ public class StandaloneKubernetesTaskManagerParameters extends AbstractKubernete @Override public Map getEnvironments() { -// TMs have environment set using the pod template. -return new HashMap<>(); +// TMs have environment set using the pod template and config containerized.taskmanager.env +return new HashMap<>( +ConfigurationUtils.getPrefixedKeyValuePairs( + ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX, flinkConfig)); } @Override diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java index 8d73dd10..203275f8 100644 --- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java @@ -54,6 +54,11 @@ public class TestUtils { public static final double TASK_MANAGER_CPU = 4; public static final double JOB_MANAGER_CPU = 2; +public static final String USER_ENV_VAR = "USER_ENV"; + +public static final String JM_ENV_VALUE = "TEST_JM"; +public static final String TM_ENV_VALUE = "TEST_TM"; + public static Map generateTestStringStringMap( String keyPrefix, String valuePrefix, int entries) { Map map = new HashMap<>(); diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java index 921bccf0..b53c51a8 100644 --- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java @@ -21,12 +21,14 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient; import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; import