This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 24decb50cf3 [BEAM-14503] Add support for Flink 1.15 (#17739)
24decb50cf3 is described below

commit 24decb50cf3e14e8cce4dd9d82b8963bf57a1805
Author: Julien Tournay <boudhe...@gmail.com>
AuthorDate: Wed May 25 19:59:04 2022 +0200

    [BEAM-14503] Add support for Flink 1.15 (#17739)
    
    * Flink 1.15 runner
    
    * Ignore Flink 1.11
---
 gradle.properties                                  |  2 +-
 .../streaming/ProcessingTimeCallbackCompat.java    | 22 +++++++++++
 .../beam/runners/flink/MiniClusterCompat.java      | 29 ++++++++++++++
 runners/flink/1.15/build.gradle                    | 34 +++++++++++++++++
 .../flink/1.15/job-server-container/build.gradle   | 26 +++++++++++++
 runners/flink/1.15/job-server/build.gradle         | 31 +++++++++++++++
 .../streaming/ProcessingTimeCallbackCompat.java    | 22 +++++++++++
 .../beam/runners/flink/MiniClusterCompat.java      | 30 +++++++++++++++
 runners/flink/flink_runner.gradle                  | 44 ++++++++++++++++------
 .../flink/FlinkStreamingTransformTranslators.java  |  4 +-
 .../wrappers/streaming/DoFnOperator.java           |  3 +-
 .../streaming/io/UnboundedSourceWrapper.java       |  4 +-
 .../flink/FlinkRequiresStableInputTest.java        |  2 +-
 .../beam/runners/flink/FlinkSavepointTest.java     |  2 +-
 .../python/apache_beam/options/pipeline_options.py |  2 +-
 settings.gradle.kts                                |  4 ++
 16 files changed, 240 insertions(+), 21 deletions(-)

diff --git a/gradle.properties b/gradle.properties
index f880631cd68..d1528eb2cd0 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -37,5 +37,5 @@ javaVersion=1.8
 docker_image_default_repo_root=apache
 docker_image_default_repo_prefix=beam_
 
-flink_versions=1.12,1.13,1.14
+flink_versions=1.12,1.13,1.14,1.15
 
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
new file mode 100644
index 00000000000..a494fec01dd
--- /dev/null
+++ 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+
+public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {}
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
new file mode 100644
index 00000000000..1bbcd0159b1
--- /dev/null
+++ 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+public class MiniClusterCompat {
+  public static CompletableFuture<String> triggerSavepoint(
+      MiniCluster cluster, JobID jobId, String targetDirectory, boolean 
cancelJob) {
+    return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob);
+  }
+}
diff --git a/runners/flink/1.15/build.gradle b/runners/flink/1.15/build.gradle
new file mode 100644
index 00000000000..a3b5fb24699
--- /dev/null
+++ b/runners/flink/1.15/build.gradle
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+
+/* All properties required for loading the Flink build script */
+project.ext {
+  // Set the version of all Flink-related dependencies here.
+  flink_version = '1.15.0'
+  // Version specific code overrides.
+  main_source_overrides = ["${basePath}/1.12/src/main/java", 
"${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", 
'./src/main/java']
+  test_source_overrides = ["${basePath}/1.12/src/test/java", 
"${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", 
'./src/test/java']
+  main_resources_overrides = []
+  test_resources_overrides = []
+  archives_base_name = 'beam-runners-flink-1.15'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_runner.gradle"
diff --git a/runners/flink/1.15/job-server-container/build.gradle 
b/runners/flink/1.15/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/1.15/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+  resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.15/job-server/build.gradle 
b/runners/flink/1.15/job-server/build.gradle
new file mode 100644
index 00000000000..05ad8feb5b7
--- /dev/null
+++ b/runners/flink/1.15/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+  archives_base_name = 'beam-runners-flink-1.15-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git 
a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
 
b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
new file mode 100644
index 00000000000..1b9baaef3f9
--- /dev/null
+++ 
b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+
+public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {}
diff --git 
a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
 
b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
new file mode 100644
index 00000000000..f02ad36116c
--- /dev/null
+++ 
b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+public class MiniClusterCompat {
+  public static CompletableFuture<String> triggerSavepoint(
+      MiniCluster cluster, JobID jobId, String targetDirectory, boolean 
cancelJob) {
+    return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob, 
SavepointFormatType.DEFAULT);
+  }
+}
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index 837bc74c32f..18cf7860ff3 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -140,13 +140,42 @@ dependencies {
   implementation library.java.slf4j_api
   implementation library.java.joda_time
   implementation library.java.args4j
-  implementation "org.apache.flink:flink-clients_2.12:$flink_version"
-  // Runtime dependencies are not included in Beam's generated pom.xml, so we 
must declare flink-clients in implementation
-  // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
-  permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version"
+
+  // Flink 1.15 shades all remaining scala dependencies and therefor does not 
depend on a specific version of Scala anymore
+  if (flink_version.compareTo("1.15") >= 0) {
+    implementation "org.apache.flink:flink-clients:$flink_version"
+    // Runtime dependencies are not included in Beam's generated pom.xml, so 
we must declare flink-clients in implementation
+    // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
+    permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version"
+
+    implementation "org.apache.flink:flink-streaming-java:$flink_version"
+    // RocksDB state backend (included in the Flink distribution)
+    provided "org.apache.flink:flink-statebackend-rocksdb:$flink_version"
+    testImplementation 
"org.apache.flink:flink-statebackend-rocksdb:$flink_version"
+    testImplementation 
"org.apache.flink:flink-streaming-java:$flink_version:tests"
+    testImplementation "org.apache.flink:flink-test-utils:$flink_version"
+
+    miniCluster "org.apache.flink:flink-runtime-web:$flink_version"
+  } else {
+    implementation "org.apache.flink:flink-clients_2.12:$flink_version"
+    // Runtime dependencies are not included in Beam's generated pom.xml, so 
we must declare flink-clients in implementation
+    // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
+    permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version"
+
+    implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version"
+    // RocksDB state backend (included in the Flink distribution)
+    provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
+    testImplementation 
"org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
+    testImplementation 
"org.apache.flink:flink-streaming-java_2.12:$flink_version:tests"
+    testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version"
+
+    miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version"
+  }
+
   implementation "org.apache.flink:flink-core:$flink_version"
   implementation "org.apache.flink:flink-metrics-core:$flink_version"
   implementation "org.apache.flink:flink-java:$flink_version"
+
   if (flink_version.compareTo("1.14") >= 0) {
     implementation "org.apache.flink:flink-runtime:$flink_version"
     implementation "org.apache.flink:flink-optimizer:$flink_version"
@@ -157,10 +186,6 @@ dependencies {
     implementation "org.apache.flink:flink-optimizer_2.12:$flink_version"
     testImplementation 
"org.apache.flink:flink-runtime_2.12:$flink_version:tests"
   }
-  implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version"
-  // RocksDB state backend (included in the Flink distribution)
-  provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
-  testImplementation 
"org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
   // FlinkStateInternalsTest extends abstract StateInternalsTest
   testImplementation project(path: ":runners:core-java", configuration: 
"testRuntimeMigration")
@@ -172,14 +197,11 @@ dependencies {
   testImplementation project(":sdks:java:io:google-cloud-platform")
   testImplementation library.java.jackson_dataformat_yaml
   testImplementation "org.apache.flink:flink-core:$flink_version:tests"
-  testImplementation 
"org.apache.flink:flink-streaming-java_2.12:$flink_version:tests"
-  testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version"
   testImplementation project(":sdks:java:harness")
   testRuntimeOnly library.java.slf4j_simple
   validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
   validatesRunner project(path: ":runners:core-java", configuration: 
"testRuntimeMigration")
   validatesRunner project(project.path)
-  miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version"
   implementation project(path: ":model:fn-execution", configuration: "shadow")
   implementation project(path: ":model:pipeline", configuration: "shadow")
   implementation project(path: ":model:job-management", configuration: 
"shadow")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 18ece765af4..a9db234c45e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import 
org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
@@ -116,7 +117,6 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -1499,7 +1499,7 @@ class FlinkStreamingTransformTranslators {
   static class UnboundedSourceWrapperNoValueWithRecordId<
           OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
       extends RichParallelSourceFunction<WindowedValue<OutputT>>
-      implements ProcessingTimeCallback,
+      implements ProcessingTimeCallbackCompat,
           BeamStoppableFunction,
           CheckpointListener,
           CheckpointedFunction {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index b1016f073d6..611866e7741 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -118,7 +118,6 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.OutputTag;
@@ -896,7 +895,7 @@ public class DoFnOperator<InputT, OutputT>
   }
 
   @SuppressWarnings("FutureReturnValueIgnored")
-  protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback 
callback) {
+  protected void scheduleForCurrentProcessingTime(ProcessingTimeCallbackCompat 
callback) {
     // We are scheduling a timer for advancing the watermark, to not delay 
finishing the bundle
     // and temporarily release the checkpoint lock. Otherwise, we could 
potentially loop when a
     // timer keeps scheduling a timer for the same timestamp.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 1527335b10a..5b6704ed35b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -28,6 +28,7 @@ import 
org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.utils.Workarounds;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -55,7 +56,6 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
 })
 public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends 
UnboundedSource.CheckpointMark>
     extends 
RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>>
-    implements ProcessingTimeCallback,
+    implements ProcessingTimeCallbackCompat,
         BeamStoppableFunction,
         CheckpointListener,
         CheckpointedFunction {
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
index 406c6fd91be..b8f384502e1 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
@@ -188,7 +188,7 @@ public class FlinkRequiresStableInputTest {
     // try multiple times because the job might not be ready yet
     for (int i = 0; i < 10; i++) {
       try {
-        return flinkCluster.triggerSavepoint(jobID, null, false).get();
+        return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null, 
false).get();
       } catch (Exception e) {
         exception = e;
         Thread.sleep(100);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index a5949a6965f..54a5cb2b11c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -274,7 +274,7 @@ public class FlinkSavepointTest implements Serializable {
     // try multiple times because the job might not be ready yet
     for (int i = 0; i < 10; i++) {
       try {
-        return flinkCluster.triggerSavepoint(jobID, null, false).get();
+        return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null, 
false).get();
       } catch (Exception e) {
         exception = e;
         LOG.debug("Exception while triggerSavepoint, trying again", e);
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 5aa29c0fd96..4c00285c808 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1358,7 +1358,7 @@ class JobServerOptions(PipelineOptions):
 class FlinkRunnerOptions(PipelineOptions):
 
   # These should stay in sync with gradle.properties.
-  PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14']
+  PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15']
 
   @classmethod
   def _add_argparse_args(cls, parser):
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 7889afe0526..7cc83b9698b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -84,6 +84,10 @@ include(":runners:flink:1.13:job-server-container")
 include(":runners:flink:1.14")
 include(":runners:flink:1.14:job-server")
 include(":runners:flink:1.14:job-server-container")
+// Flink 1.15
+include(":runners:flink:1.15")
+include(":runners:flink:1.15:job-server")
+include(":runners:flink:1.15:job-server-container")
 /* End Flink Runner related settings */
 include(":runners:twister2")
 include(":runners:google-cloud-dataflow-java")

Reply via email to