(flink) branch release-1.17 updated: [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier

2023-12-07 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 2dd1076dbbf [FLINK-33693][checkpoint] Force aligned barrier works with 
timeoutable aligned checkpoint barrier
2dd1076dbbf is described below

commit 2dd1076dbbf0f47e4b5d15e67e0265679397f93e
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Fri Dec 1 16:04:36 2023 +0800

[FLINK-33693][checkpoint] Force aligned barrier works with timeoutable 
aligned checkpoint barrier
---
 .../streaming/runtime/io/RecordWriterOutput.java   |   4 +-
 .../runtime/io/RecordWriterOutputTest.java | 112 +
 2 files changed, 113 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 93acf23d30b..478fe4e3255 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -149,9 +149,7 @@ public class RecordWriterOutput implements 
WatermarkGaugeExposingOutputhttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordWriterOutput}. */
+class RecordWriterOutputTest {
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testDisableUnalignedCheckpoint(boolean supportsUnalignedCheckpoints) 
throws IOException {
+Queue> queue = new LinkedList<>();
+
+RecordWriter>> task1 =
+new 
RecordWriterBuilder>>()
+.build(
+new MockResultPartitionWriter() {
+@Override
+public void broadcastEvent(
+AbstractEvent event, boolean 
isPriorityEvent) {
+queue.add(Tuple2.of(event, 
isPriorityEvent));
+}
+});
+
+RecordWriterOutput writerOutput =
+new RecordWriterOutput<>(
+task1, LongSerializer.INSTANCE, null, 
supportsUnalignedCheckpoints);
+
+// Test unalignedBarrier
+CheckpointBarrier unalignedBarrier =
+new CheckpointBarrier(
+0,
+1L,
+CheckpointOptions.unaligned(
+CheckpointType.CHECKPOINT,
+
CheckpointStorageLocationReference.getDefault()));
+
+writerOutput.broadcastEvent(unalignedBarrier, true);
+assertAlignmentTypeAndIsPriorityEvent(
+queue.poll(),
+supportsUnalignedCheckpoints
+? AlignmentType.UNALIGNED
+: AlignmentType.FORCED_ALIGNED,
+supportsUnalignedCheckpoints);
+
+// Test alignedTimeoutBarrier
+CheckpointBarrier alignedTimeoutBarrier =
+new CheckpointBarrier(
+0,
+1L,
+CheckpointOptions.alignedWithTimeout(
+

(flink) branch release-1.18 updated: [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier

2023-12-07 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 3884f65a3d0 [FLINK-33693][checkpoint] Force aligned barrier works with 
timeoutable aligned checkpoint barrier
3884f65a3d0 is described below

commit 3884f65a3d049acd5f092d5b48a1d02ff24b77e6
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Fri Dec 1 16:04:36 2023 +0800

[FLINK-33693][checkpoint] Force aligned barrier works with timeoutable 
aligned checkpoint barrier
---
 .../streaming/runtime/io/RecordWriterOutput.java   |   4 +-
 .../runtime/io/RecordWriterOutputTest.java | 112 +
 2 files changed, 113 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 44e41e92197..43b2d8400b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -182,9 +182,7 @@ public class RecordWriterOutput
 }
 
 public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
-if (isPriorityEvent
-&& event instanceof CheckpointBarrier
-&& !supportsUnalignedCheckpoints) {
+if (event instanceof CheckpointBarrier && 
!supportsUnalignedCheckpoints) {
 final CheckpointBarrier barrier = (CheckpointBarrier) event;
 event = 
barrier.withOptions(barrier.getCheckpointOptions().withUnalignedUnsupported());
 isPriorityEvent = false;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java
new file mode 100644
index 000..d1bf139629e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordWriterOutput}. */
+class RecordWriterOutputTest {
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testDisableUnalignedCheckpoint(boolean supportsUnalignedCheckpoints) 
throws IOException {
+Queue> queue = new LinkedList<>();
+
+RecordWriter>> task1 =
+new 
RecordWriterBuilder>>()
+.build(
+new MockResultPartitionWriter() {
+@Override
+public void broadcastEvent(
+AbstractEvent event, boolean 
isPriorityEvent) {
+

(flink) 01/02: [FLINK-33480] Implement restore tests for GroupAggregate node

2023-12-07 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 193b1c68976cdfbd66147278f23d7d427d9b5562
Author: bvarghese1 
AuthorDate: Tue Nov 7 17:03:53 2023 -0800

[FLINK-33480] Implement restore tests for GroupAggregate node
---
 .../exec/stream/GroupAggregateRestoreTest.java |  46 ++
 .../exec/stream/GroupAggregateTestPrograms.java| 374 +
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   1 +
 .../plan/group-aggregate-distinct-mini-batch.json  | 621 +
 .../savepoint/_metadata| Bin 0 -> 25689 bytes
 .../plan/group-aggregate-distinct.json | 359 
 .../group-aggregate-distinct/savepoint/_metadata   | Bin 0 -> 27875 bytes
 .../plan/group-aggregate-simple-mini-batch.json| 351 
 .../savepoint/_metadata| Bin 0 -> 9479 bytes
 .../plan/group-aggregate-simple.json   | 272 +
 .../group-aggregate-simple/savepoint/_metadata | Bin 0 -> 10954 bytes
 .../group-aggregate-udf-with-merge-mini-batch.json | 400 +
 .../savepoint/_metadata| Bin 0 -> 17762 bytes
 .../plan/group-aggregate-udf-with-merge.json   | 211 +++
 .../savepoint/_metadata| Bin 0 -> 19881 bytes
 ...oup-aggregate-udf-without-merge-mini-batch.json | 254 +
 .../savepoint/_metadata| Bin 0 -> 12731 bytes
 .../plan/group-aggregate-udf-without-merge.json| 231 
 .../savepoint/_metadata| Bin 0 -> 14618 bytes
 19 files changed, 3120 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
new file mode 100644
index 000..2c5043d9f4d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecGroupAggregate}. */
+public class GroupAggregateRestoreTest extends RestoreTestBase {
+
+public GroupAggregateRestoreTest() {
+super(StreamExecGroupAggregate.class);
+}
+
+@Override
+public List programs() {
+return Arrays.asList(
+GroupAggregateTestPrograms.GROUP_BY_SIMPLE,
+GroupAggregateTestPrograms.GROUP_BY_SIMPLE_MINI_BATCH,
+GroupAggregateTestPrograms.GROUP_BY_DISTINCT,
+GroupAggregateTestPrograms.GROUP_BY_DISTINCT_MINI_BATCH,
+GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE,
+GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE_MINI_BATCH,
+GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE,
+
GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE_MINI_BATCH);
+}
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
new file mode 100644
index 000..b6a61a72144
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you 

(flink) branch master updated (f751a00fd6f -> fac3ac78667)

2023-12-07 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from f751a00fd6f [FLINK-33758] Implement restore tests for TemporalSort 
node (#23879)
 new 193b1c68976 [FLINK-33480] Implement restore tests for GroupAggregate 
node
 new fac3ac78667 [FLINK-33480] Remove GroupAggJsonPlanTest & 
GroupAggJsonPlanITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../exec/stream/GroupAggregateJsonPlanTest.java| 182 --
 .../exec/stream/GroupAggregateRestoreTest.java |  46 +++
 .../exec/stream/GroupAggregateTestPrograms.java| 374 +++
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   1 +
 .../jsonplan/GroupAggregateJsonPlanITCase.java | 205 ---
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 314 
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 402 -
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 304 
 .../plan/group-aggregate-distinct-mini-batch.json} | 220 +--
 .../savepoint/_metadata| Bin 0 -> 25689 bytes
 .../plan/group-aggregate-distinct.json}| 176 -
 .../group-aggregate-distinct/savepoint/_metadata   | Bin 0 -> 27875 bytes
 .../plan/group-aggregate-simple-mini-batch.json}   | 196 +-
 .../savepoint/_metadata| Bin 0 -> 9479 bytes
 .../plan/group-aggregate-simple.json}  | 134 +++
 .../group-aggregate-simple/savepoint/_metadata | Bin 0 -> 10954 bytes
 .../group-aggregate-udf-with-merge-mini-batch.json | 400 
 .../savepoint/_metadata| Bin 0 -> 17762 bytes
 .../plan/group-aggregate-udf-with-merge.json   | 211 +++
 .../savepoint/_metadata| Bin 0 -> 19881 bytes
 ...up-aggregate-udf-without-merge-mini-batch.json} | 225 +---
 .../savepoint/_metadata| Bin 0 -> 12731 bytes
 .../plan/group-aggregate-udf-without-merge.json}   | 208 ---
 .../savepoint/_metadata| Bin 0 -> 14618 bytes
 24 files changed, 1571 insertions(+), 2027 deletions(-)
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
 create mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
 create mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
 delete mode 100644 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
 delete mode 100644 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
 delete mode 100644 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
 rename 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
 => 
restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/plan/group-aggregate-distinct-mini-batch.json}
 (78%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/savepoint/_metadata
 rename 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
 => 
restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/plan/group-aggregate-distinct.json}
 (67%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/savepoint/_metadata
 rename 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
 => 

(flink) 02/02: [FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase

2023-12-07 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fac3ac786674f9b6ce5716902e74b1533ccb1c0a
Author: bvarghese1 
AuthorDate: Tue Nov 7 17:05:20 2023 -0800

[FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase
---
 .../exec/stream/GroupAggregateJsonPlanTest.java| 182 --
 .../jsonplan/GroupAggregateJsonPlanITCase.java | 205 ---
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 345 
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 607 -
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 288 --
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 367 -
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 314 ---
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 402 --
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 281 --
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 304 ---
 10 files changed, 3295 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
deleted file mode 100644
index e655a3527ea..000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.CountDistinct;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-/** Test json serialization/deserialization for group aggregate. */
-@ExtendWith(ParameterizedTestExtension.class)
-class GroupAggregateJsonPlanTest extends TableTestBase {
-
-@Parameter private boolean isMiniBatchEnabled;
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@Parameters(name = "isMiniBatchEnabled={0}")
-private static List testData() {
-return Arrays.asList(true, false);
-}
-
-@BeforeEach
-void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-if (isMiniBatchEnabled) {
-tEnv.getConfig()
-.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
true)
-.set(
-
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-Duration.ofSeconds(10))
-.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-} else {
-
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
false);
-}
-
-String srcTableDdl =
-"CREATE TABLE MyTable (\n"
-+ "  a bigint,\n"
-+ "  b int not null,\n"
-+ "  c varchar,\n"
-+ 

(flink) branch master updated: [FLINK-33758] Implement restore tests for TemporalSort node (#23879)

2023-12-07 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f751a00fd6f [FLINK-33758] Implement restore tests for TemporalSort 
node (#23879)
f751a00fd6f is described below

commit f751a00fd6f0e70187d2a9ae2ccd6a728d9a2c64
Author: James Hughes 
AuthorDate: Thu Dec 7 07:51:40 2023 -0500

[FLINK-33758] Implement restore tests for TemporalSort node (#23879)
---
 .../exec/stream/TemporalSortJsonPlanTest.java  |  68 
 .../nodes/exec/stream/TemporalSortRestoreTest.java |  40 +
 .../exec/stream/TemporalSortTestPrograms.java  |  96 +++
 .../stream/jsonplan/TemporalSortJsonITCase.java|  87 --
 .../plan/temporal-sort-proctime.json}  | 180 +++--
 .../temporal-sort-proctime/savepoint/_metadata | Bin 0 -> 6037 bytes
 .../plan/temporal-sort-rowtime.json}   | 144 +++--
 .../temporal-sort-rowtime/savepoint/_metadata  | Bin 0 -> 10729 bytes
 8 files changed, 210 insertions(+), 405 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
deleted file mode 100644
index 8ebd6a47e59..000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for temporal sort. */
-class TemporalSortJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-
-@BeforeEach
-void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-TableEnvironment tEnv = util.getTableEnv();
-
-String srcTableDdl =
-"CREATE TABLE MyTable (\n"
-+ " a INT,\n"
-+ " b BIGINT,\n"
-+ " c VARCHAR,\n"
-+ " `rowtime` AS TO_TIMESTAMP(c),\n"
-+ " proctime as PROCTIME(),\n"
-+ " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values')\n";
-tEnv.executeSql(srcTableDdl);
-
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ " a INT\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values')\n";
-tEnv.executeSql(sinkTableDdl);
-}
-
-@Test
-void testSortProcessingTime() {
-util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by 
proctime, c");
-}
-
-@Test
-void testSortRowTime() {
-util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by 
rowtime, c");
-}
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java
new file mode 100644
index 000..f6a1b0fcea9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * 

(flink-kubernetes-operator) branch main updated: [Flink 31966] Flink Kubernetes operator lacks TLS support

2023-12-07 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 30566ed7 [Flink 31966] Flink Kubernetes operator lacks TLS support
30566ed7 is described below

commit 30566ed7390a4fce1dc3e239efe3c519c55aa9b1
Author: Tony Garrard 
AuthorDate: Thu Dec 7 12:16:39 2023 +

[Flink 31966] Flink Kubernetes operator lacks TLS support

Signed-off-by: A. Garrard 
---
 docs/content/docs/operations/helm.md   |   4 +
 examples/README.md |   5 +
 .../basic-secure-deployment-only.yaml  |  49 
 .../basic-secure-session-job-only.yaml |  42 +++
 examples/flink-tls-example/basic-secure.yaml   |  47 +++
 examples/flink-tls-example/pre-install.yaml| 106 +++
 .../operator/service/AbstractFlinkService.java |  68 -
 .../operator/service/NativeFlinkService.java   |   2 +-
 .../operator/service/StandaloneFlinkService.java   |   2 +-
 .../flink/kubernetes/operator/utils/EnvUtils.java  |   3 +
 .../kubernetes/operator/TestingFlinkService.java   |   2 +-
 .../operator/service/AbstractFlinkServiceTest.java |   2 +-
 .../operator/service/SecureFlinkServiceTest.java   | 319 +
 .../src/test/resources/keystore.jks| Bin 0 -> 3018 bytes
 .../src/test/resources/truststore.jks  | Bin 0 -> 861 bytes
 .../templates/flink-operator.yaml  |  21 ++
 helm/flink-kubernetes-operator/values.yaml |   8 +
 17 files changed, 671 insertions(+), 9 deletions(-)

diff --git a/docs/content/docs/operations/helm.md 
b/docs/content/docs/operations/helm.md
index bb84a2b5..dee6efd0 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -111,6 +111,10 @@ The configurable parameters of the Helm chart and which 
default values as detail
 | operatorHealth.livenessProbe   | Liveness probe 
configuration for the operator using the health endpoint. Only time settings 
should be configured, endpoint is set automatically based on port. |



  [...]
 | operatorHealth.startupProbe| Startup probe configuration 
for the operator using the health endpoint. Only time settings should be 
configured, endpoint is set automatically based on port.  | 



 [...]
 | postStart  | The postStart hook 
configuration for the main container.   
|   



   [...]
+| tls.create | Whether to mount an 
optional secret containing a tls truststore for the flink-kubernetes-operator.  
   | false  



  [...]
+| tls.secretName | The name of the tls secret  

   | flink-operator-cert



  [...]
+| tls.secretKeyRef.name | The name of the secret containing 
the password for the java keystore/truststore   
 | operator-certificate-password



(flink-kubernetes-operator) branch main updated: Add Java client library update process to the Upgrade page (#723)

2023-12-07 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 98ad5b82 Add Java client library update process to the Upgrade page 
(#723)
98ad5b82 is described below

commit 98ad5b822b999c3079873993d093af8a77ed4002
Author: Maximilian Michels 
AuthorDate: Thu Dec 7 12:16:40 2023 +0100

Add Java client library update process to the Upgrade page (#723)
---
 docs/content/docs/operations/upgrade.md | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/operations/upgrade.md 
b/docs/content/docs/operations/upgrade.md
index 3b570a39..1bb3d350 100644
--- a/docs/content/docs/operations/upgrade.md
+++ b/docs/content/docs/operations/upgrade.md
@@ -43,7 +43,13 @@ If you are upgrading from `kubernetes-operator-1.0.0` or 
later, please refer to
 
 We will cover these steps in detail in the next sections.
 
-### 1. Upgrading the CRD
+### 1. Upgrading the Java client library
+
+If you use the Flink Kubernetes operator Java client library, you need to 
update it first to ensure that responses from
+the new operator version can be parsed properly. For minor releases, the new 
version of the Java library is
+backwards-compatible with the previous minor version of the operator.
+
+### 2. Upgrading the CRD
 
 The first step of the upgrade process is upgrading the CRDs for 
`FlinkDeployment` and `FlinkSessionJob` resources.
 This step must be completed manually and is not part of the helm installation 
logic.
@@ -57,7 +63,7 @@ kubectl replace -f 
helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.ap
 Please note that we are using the `replace` command here which ensures that 
running deployments are unaffected.
 {{< /hint >}}
 
-### 2. Upgrading the Helm deployment
+### 3. Upgrading the Helm deployment
 
 {{< hint danger >}}
 Before upgrading, please compare the version difference between the currently 
generated yaml and the running yaml, which will be used for backup and restore.



(flink) branch master updated: [FLINK-33693][checkpoint] Force aligned barrier works with timeoutable aligned checkpoint barrier

2023-12-07 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a4d14ec224b [FLINK-33693][checkpoint] Force aligned barrier works with 
timeoutable aligned checkpoint barrier
a4d14ec224b is described below

commit a4d14ec224bbaf498ca2e912ff51539872a34878
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Fri Dec 1 16:04:36 2023 +0800

[FLINK-33693][checkpoint] Force aligned barrier works with timeoutable 
aligned checkpoint barrier
---
 .../streaming/runtime/io/RecordWriterOutput.java   |   4 +-
 .../runtime/io/RecordWriterOutputTest.java | 112 +
 2 files changed, 113 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 44e41e92197..43b2d8400b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -182,9 +182,7 @@ public class RecordWriterOutput
 }
 
 public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
-if (isPriorityEvent
-&& event instanceof CheckpointBarrier
-&& !supportsUnalignedCheckpoints) {
+if (event instanceof CheckpointBarrier && 
!supportsUnalignedCheckpoints) {
 final CheckpointBarrier barrier = (CheckpointBarrier) event;
 event = 
barrier.withOptions(barrier.getCheckpointOptions().withUnalignedUnsupported());
 isPriorityEvent = false;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java
new file mode 100644
index 000..d1bf139629e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordWriterOutputTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordWriterOutput}. */
+class RecordWriterOutputTest {
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testDisableUnalignedCheckpoint(boolean supportsUnalignedCheckpoints) 
throws IOException {
+Queue> queue = new LinkedList<>();
+
+RecordWriter>> task1 =
+new 
RecordWriterBuilder>>()
+.build(
+new MockResultPartitionWriter() {
+@Override
+public void broadcastEvent(
+AbstractEvent event, boolean 
isPriorityEvent) {
+

(flink-connector-shared-utils) branch parent_pom updated: [FLINK-32894] Use 3.5.1 for maven-shade-plugin to support Java 17/21

2023-12-07 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch parent_pom
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


The following commit(s) were added to refs/heads/parent_pom by this push:
 new 7c2745a  [FLINK-32894] Use 3.5.1 for maven-shade-plugin to support 
Java 17/21
7c2745a is described below

commit 7c2745af777c6681b9eb14f0b05b2136fb141784
Author: Qingsheng Ren 
AuthorDate: Thu Dec 7 17:50:32 2023 +0800

[FLINK-32894] Use 3.5.1 for maven-shade-plugin to support Java 17/21
---
 pom.xml | 7 +--
 1 file changed, 1 insertion(+), 6 deletions(-)

diff --git a/pom.xml b/pom.xml
index fb93bac..3afa1b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -437,7 +437,7 @@ under the License.
 
 org.apache.maven.plugins
 maven-shade-plugin
-3.1.1
+3.5.1
 
 
 
@@ -877,11 +877,6 @@ under the License.
 
 
 
-
-org.apache.maven.plugins
-maven-shade-plugin
-3.2.4
-
 
 io.github.zentol.japicmp
 japicmp-maven-plugin



(flink-connector-shared-utils) branch ci_utils updated: [FLINK-33556][CI] Test infrastructure for externalized python code

2023-12-07 Thread gaborgsomogyi
This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch ci_utils
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


The following commit(s) were added to refs/heads/ci_utils by this push:
 new 7691962  [FLINK-33556][CI] Test infrastructure for externalized python 
code
7691962 is described below

commit 7691962b8031536a2baa4d39252a38198ba91dc5
Author: pvary 
AuthorDate: Thu Dec 7 10:33:52 2023 +0100

[FLINK-33556][CI] Test infrastructure for externalized python code
---
 .github/workflows/python_ci.yml |  86 
 python/README.md|   6 +
 python/build-wheels.sh  |  52 +++
 python/glibc_version_fix.h  |  17 +
 python/install_command.sh   |  31 ++
 python/lint-python.sh   | 876 
 6 files changed, 1068 insertions(+)

diff --git a/.github/workflows/python_ci.yml b/.github/workflows/python_ci.yml
new file mode 100644
index 000..89e10d9
--- /dev/null
+++ b/.github/workflows/python_ci.yml
@@ -0,0 +1,86 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+on:
+  workflow_call:
+inputs:
+  flink_version:
+description: "Flink version to test against."
+required: true
+type: string
+  timeout_global:
+description: "The timeout in minutes for the entire workflow."
+required: false
+type: number
+default: 80
+  timeout_test:
+description: "The timeout in minutes for the test compile"
+required: false
+type: number
+default: 50
+  connector_branch:
+description: "Branch that need to be checked out"
+required: false
+type: string
+
+jobs:
+  python_test:
+runs-on: ubuntu-latest
+timeout-minutes: ${{ inputs.timeout_global }}
+env:
+  MVN_COMMON_OPTIONS: -U -B --no-transfer-progress -Dflink.version=${{ 
inputs.flink_version }} -DskipTests
+  MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+  MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out"
+steps:
+  - run: echo "Running CI pipeline for JDK version 8"
+
+  - name: Check out repository code
+uses: actions/checkout@v3
+with:
+  ref: "${{ inputs.connector_branch }}"
+
+  - name: Set JDK
+uses: actions/setup-java@v3
+with:
+  java-version: 8
+  distribution: 'temurin'
+  cache: 'maven'
+
+  - name: Set Maven 3.8.6
+uses: stCarolas/setup-maven@v4.5
+with:
+  maven-version: 3.8.6
+
+  - name: Compile
+timeout-minutes: ${{ inputs.timeout_test }}
+run: |
+  set -o pipefail
+
+  mvn clean install ${MVN_COMMON_OPTIONS} \
+${{ env.MVN_CONNECTION_OPTIONS }} \
+-Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties \
+| tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
+
+  - name: Run Python test
+timeout-minutes: ${{ inputs.timeout_test }}
+run: |
+  set -o pipefail
+
+  cd flink-python
+  chmod a+x dev/* 
+  ./dev/lint-python.sh -e mypy,sphinx | tee ${{ 
env.MVN_BUILD_OUTPUT_FILE }}
diff --git a/python/README.md b/python/README.md
new file mode 100644
index 000..0dc8217
--- /dev/null
+++ b/python/README.md
@@ -0,0 +1,6 @@
+This directory contains commonly used scripts for testing and creating python 
packages for connectors.
+
+The original version of the files are based on
+https://github.com/apache/flink/tree/release-1.17.2/flink-python/dev.
+
+Created FLINK-33762 to make these scripts versioned to allow backward 
incompatible changes in the future.
diff --git a/python/build-wheels.sh b/python/build-wheels.sh
new file mode 100755
index 000..7f18a91
--- /dev/null
+++ b/python/build-wheels.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license 

(flink) branch master updated (e4f389895d9 -> ca1c7ce4812)

2023-12-07 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from e4f389895d9 [FLINK-33556] Test infrastructure for externalized python 
code
 add ca1c7ce4812 [FLINK-33666][table] Use the same constraint name in 
MergeTableLikeUtil and Schema

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/table/sql/show.md |  4 +-
 docs/content/docs/dev/table/sql/show.md|  4 +-
 .../src/test/resources/sql/table.q | 82 +++---
 .../src/test/resources/sql/table.q |  8 +--
 .../planner/operations/MergeTableLikeUtil.java |  7 +-
 .../planner/operations/MergeTableLikeUtilTest.java |  4 +-
 .../testChangelogSource.out|  2 +-
 .../testUpsertSource.out   |  2 +-
 ...WithNonDeterministicFuncSinkWithDifferentPk.out |  4 +-
 .../testJoinTemporalFunction.out   |  2 +-
 .../testTemporalTableJoin.out  |  2 +-
 .../plan/deduplicate-asc-proctime.json |  4 +-
 .../deduplicate-asc/plan/deduplicate-asc.json  |  4 +-
 .../deduplicate-desc/plan/deduplicate-desc.json|  4 +-
 .../stream-exec-expand_1/expand/plan/expand.json   |  4 +-
 15 files changed, 71 insertions(+), 66 deletions(-)



(flink) branch master updated (4eb5b588e4d -> e4f389895d9)

2023-12-07 Thread gaborgsomogyi
This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 4eb5b588e4d [FLINK-33726][sql-client] print time cost for streaming 
queries
 add e4f389895d9 [FLINK-33556] Test infrastructure for externalized python 
code

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/pyflink_gateway_server.py | 17 +++--
 1 file changed, 11 insertions(+), 6 deletions(-)



(flink-kubernetes-operator) branch main updated: [FLINK-33645] Taskmanager env vars in config not given to taskmanager

2023-12-07 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new e4528260 [FLINK-33645] Taskmanager env vars in config not given to 
taskmanager
e4528260 is described below

commit e4528260be78dabeb6552afda10e78802021984d
Author: Tony Garrard 
AuthorDate: Thu Dec 7 08:11:48 2023 +

[FLINK-33645] Taskmanager env vars in config not given to taskmanager


Signed-off-by: A. Garrard 
---
 .../StandaloneKubernetesTaskManagerParameters.java |  8 ++--
 .../operator/kubeclient/utils/TestUtils.java   |  5 +
 .../KubernetesStandaloneClusterDescriptorTest.java | 22 ++
 3 files changed, 33 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
index de313f27..2517b756 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
@@ -19,6 +19,8 @@ package 
org.apache.flink.kubernetes.operator.kubeclient.parameters;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -73,8 +75,10 @@ public class StandaloneKubernetesTaskManagerParameters 
extends AbstractKubernete
 
 @Override
 public Map getEnvironments() {
-// TMs have environment set using the pod template.
-return new HashMap<>();
+// TMs have environment set using the pod template and config 
containerized.taskmanager.env
+return new HashMap<>(
+ConfigurationUtils.getPrefixedKeyValuePairs(
+
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX, flinkConfig));
 }
 
 @Override
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
index 8d73dd10..203275f8 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
@@ -54,6 +54,11 @@ public class TestUtils {
 public static final double TASK_MANAGER_CPU = 4;
 public static final double JOB_MANAGER_CPU = 2;
 
+public static final String USER_ENV_VAR = "USER_ENV";
+
+public static final String JM_ENV_VALUE = "TEST_JM";
+public static final String TM_ENV_VALUE = "TEST_TM";
+
 public static Map generateTestStringStringMap(
 String keyPrefix, String valuePrefix, int entries) {
 Map map = new HashMap<>();
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
index 921bccf0..b53c51a8 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -21,12 +21,14 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import 
org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
 import 
org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
 import