[flink] branch release-1.12 updated: [FLINK-21552][runtime] Unreserve managed memory if OpaqueMemoryResource cannot be initialized.

2021-03-03 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
 new 6464749  [FLINK-21552][runtime] Unreserve managed memory if 
OpaqueMemoryResource cannot be initialized.
6464749 is described below

commit 64647490f3e96bdbdfe535654c667f1ead0b026c
Author: Xintong Song 
AuthorDate: Tue Mar 2 16:21:28 2021 +0800

[FLINK-21552][runtime] Unreserve managed memory if OpaqueMemoryResource 
cannot be initialized.

This closes #15057
---
 .../org/apache/flink/runtime/memory/MemoryManager.java  |  7 ++-
 .../memory/MemoryManagerSharedResourcesTest.java| 17 +
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 2ef80fa..cf44e79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -540,7 +540,12 @@ public class MemoryManager {
 e);
 }
 
-return initializer.apply(size);
+try {
+return initializer.apply(size);
+} catch (Throwable t) {
+releaseMemory(type, size);
+throw t;
+}
 };
 
 final Consumer releaser = (size) -> releaseMemory(type, size);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
index d224a15..9c4fb9e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
@@ -224,6 +224,23 @@ public class MemoryManagerSharedResourcesTest {
 assertTrue(resource.getResourceHandle().closed);
 }
 
+@Test
+public void testAllocateResourceInitializeFail() {
+final MemoryManager memoryManager = createMemoryManager();
+
+try {
+memoryManager.getSharedMemoryResourceForManagedMemory(
+"type",
+(ignore) -> {
+throw new RuntimeException("initialization fail");
+},
+0.1);
+fail("expect to fail");
+} catch (Throwable t) {
+// expected
+}
+assertTrue(memoryManager.verifyEmpty());
+}
 // 
 //  Utils
 // 



[flink] branch master updated (2233cb2 -> 1e71b1c)

2021-03-03 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2233cb2  [FLINK-21576][runtime] Remove legacy 
ExecutionVertex#getPreferredLocations()
 add 1e71b1c  [FLINK-21552][runtime] Unreserve managed memory if 
OpaqueMemoryResource cannot be initialized.

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/runtime/memory/MemoryManager.java  |  7 ++-
 .../memory/MemoryManagerSharedResourcesTest.java| 17 +
 2 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated: [FLINK-21576][runtime] Remove legacy ExecutionVertex#getPreferredLocations()

2021-03-03 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2233cb2  [FLINK-21576][runtime] Remove legacy 
ExecutionVertex#getPreferredLocations()
2233cb2 is described below

commit 2233cb2ebad5908464e36ac890f0ab54bd57d35a
Author: Zhu Zhu 
AuthorDate: Wed Mar 3 14:39:52 2021 +0800

[FLINK-21576][runtime] Remove legacy ExecutionVertex#getPreferredLocations()

It is superseded by DefaultPreferredLocationsRetriever now and is no longer 
used.
Its test ExecutionVertexLocalityTest is superseded by 
DefaultPreferredLocationsRetrieverTest.
---
 .../flink/runtime/executiongraph/Execution.java|  53 -
 .../runtime/executiongraph/ExecutionVertex.java| 105 -
 .../ExecutionVertexLocalityTest.java   | 253 -
 3 files changed, 411 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index d46fb7f..d5fee3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -1366,57 +1364,6 @@ public class Execution
 //  Miscellaneous
 // 

 
-/**
- * Calculates the preferred locations based on the location preference 
constraint.
- *
- * @param locationPreferenceConstraint constraint for the location 
preference
- * @return Future containing the collection of preferred locations. This 
might not be completed
- * if not all inputs have been a resource assigned.
- */
-@VisibleForTesting
-public CompletableFuture> 
calculatePreferredLocations(
-LocationPreferenceConstraint locationPreferenceConstraint) {
-final Collection> 
preferredLocationFutures =
-getVertex().getPreferredLocations();
-final CompletableFuture> 
preferredLocationsFuture;
-
-switch (locationPreferenceConstraint) {
-case ALL:
-preferredLocationsFuture = 
FutureUtils.combineAll(preferredLocationFutures);
-break;
-case ANY:
-final ArrayList 
completedTaskManagerLocations =
-new ArrayList<>(preferredLocationFutures.size());
-
-for (CompletableFuture 
preferredLocationFuture :
-preferredLocationFutures) {
-if (preferredLocationFuture.isDone()
-&& 
!preferredLocationFuture.isCompletedExceptionally()) {
-final TaskManagerLocation taskManagerLocation =
-preferredLocationFuture.getNow(null);
-
-if (taskManagerLocation == null) {
-throw new FlinkRuntimeException(
-"TaskManagerLocationFuture was completed 
with null. This indicates a programming bug.");
-}
-
-completedTaskManagerLocations.add(taskManagerLocation);
-}
-}
-
-preferredLocationsFuture =
-
CompletableFuture.completedFuture(completedTaskManagerLocations);
-break;
-default:
-throw new RuntimeException(
-"Unknown LocationPreferenceConstraint "
-+ locationPreferenceConstraint
-+ '.');
-}
-
-return preferredLocationsFuture;
-}
-
 public void transitionState(ExecutionState targetState) {
 transitionState(state, 

[flink] branch master updated (0adc5c2 -> f743974)

2021-03-03 Thread liyu
This is an automated email from the ASF dual-hosted git repository.

liyu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0adc5c2  [FLINK-21021][python] Bump Beam to 2.27.0 (#14741)
 new be78727  [FLINK-20496][state backends] Introduce RocksDB metadata 
block size setting.
 new f743974  [FLINK-20496][state backends] RocksDB partitioned 
index/filters option.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../rocksdb_configurable_configuration.html|  6 +++
 .../generated/rocksdb_configuration.html   |  6 +++
 .../generated/state_backend_rocksdb_section.html   |  6 +++
 .../state/DefaultConfigurableOptionsFactory.java   | 24 +
 .../state/RocksDBConfigurableOptions.java  |  9 
 .../state/RocksDBMemoryConfiguration.java  | 19 +++
 .../state/RocksDBMemoryControllerUtils.java|  8 ++-
 .../streaming/state/RocksDBOperationUtils.java |  6 ++-
 .../contrib/streaming/state/RocksDBOptions.java| 14 +
 .../streaming/state/RocksDBResourceContainer.java  | 49 +
 .../streaming/state/RocksDBSharedResources.java| 12 -
 .../state/RocksDBMemoryControllerUtilsTest.java|  5 +-
 .../state/RocksDBResourceContainerTest.java| 62 +-
 .../state/RocksDBStateBackendConfigTest.java   |  5 ++
 14 files changed, 223 insertions(+), 8 deletions(-)



[flink] 02/02: [FLINK-20496][state backends] RocksDB partitioned index/filters option.

2021-03-03 Thread liyu
This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f7439740e8e023d458d2ac0cb2a58682eb9b6beb
Author: liuyufei 
AuthorDate: Wed Dec 9 23:39:32 2020 +0800

[FLINK-20496][state backends] RocksDB partitioned index/filters option.

Configure partitioned index and filters options according to 
'https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters'.

This closes #14341.
---
 .../generated/rocksdb_configuration.html   |  6 +++
 .../generated/state_backend_rocksdb_section.html   |  6 +++
 .../state/RocksDBMemoryConfiguration.java  | 19 +++
 .../state/RocksDBMemoryControllerUtils.java|  8 ++-
 .../streaming/state/RocksDBOperationUtils.java |  6 ++-
 .../contrib/streaming/state/RocksDBOptions.java| 14 +
 .../streaming/state/RocksDBResourceContainer.java  | 49 +
 .../streaming/state/RocksDBSharedResources.java| 12 -
 .../state/RocksDBMemoryControllerUtilsTest.java|  5 +-
 .../state/RocksDBResourceContainerTest.java| 62 +-
 10 files changed, 179 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index 79b769d..74b1236 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -39,6 +39,12 @@
 If set, the RocksDB state backend will automatically configure 
itself to use the managed memory budget of the task slot, and divide the memory 
over write buffers, indexes, block caches, etc. That way, the three major uses 
of memory of RocksDB will be capped.
 
 
+
state.backend.rocksdb.memory.partitioned-index-filters
+false
+Boolean
+With partitioning, the index/filter block of an SST file is 
partitioned into smaller blocks with an additional top-level index on them. 
When reading an index/filter, only top-level index is loaded into memory. The 
partitioned index/filter then uses the top-level index to load on demand into 
the block cache the partitions that are required to perform the index/filter 
query. This option only has an effect when 
'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb [...]
+
+
 state.backend.rocksdb.memory.write-buffer-ratio
 0.5
 Double
diff --git 
a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html 
b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
index 8a74eee..a8ed2c7 100644
--- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
@@ -27,6 +27,12 @@
 If set, the RocksDB state backend will automatically configure 
itself to use the managed memory budget of the task slot, and divide the memory 
over write buffers, indexes, block caches, etc. That way, the three major uses 
of memory of RocksDB will be capped.
 
 
+
state.backend.rocksdb.memory.partitioned-index-filters
+false
+Boolean
+With partitioning, the index/filter block of an SST file is 
partitioned into smaller blocks with an additional top-level index on them. 
When reading an index/filter, only top-level index is loaded into memory. The 
partitioned index/filter then uses the top-level index to load on demand into 
the block cache the partitions that are required to perform the index/filter 
query. This option only has an effect when 
'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb [...]
+
+
 state.backend.rocksdb.memory.write-buffer-ratio
 0.5
 Double
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
index dda5409..3bfcc03 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
@@ -51,6 +51,9 @@ public final class RocksDBMemoryConfiguration implements 
Serializable {
  */
 @Nullable private Double highPriorityPoolRatio;
 
+/** Flag whether to use partition index/filters. Null if not set. */
+@Nullable private Boolean usePartitionedIndexFilters;
+
 // 
 
 /**
@@ -166,6 +169,17 @@ public final class 

[flink] 01/02: [FLINK-20496][state backends] Introduce RocksDB metadata block size setting.

2021-03-03 Thread liyu
This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be78727ae125c036cbb297d020e8a7ad23aae083
Author: liuyufei 
AuthorDate: Wed Dec 9 23:38:49 2020 +0800

[FLINK-20496][state backends] Introduce RocksDB metadata block size setting.
---
 .../rocksdb_configurable_configuration.html|  6 ++
 .../state/DefaultConfigurableOptionsFactory.java   | 24 ++
 .../state/RocksDBConfigurableOptions.java  |  9 
 .../state/RocksDBStateBackendConfigTest.java   |  5 +
 4 files changed, 44 insertions(+)

diff --git 
a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index 29142bb..a55c319 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -21,6 +21,12 @@
 The amount of the cache for data blocks in RocksDB. RocksDB 
has default block-cache size as '8MB'.
 
 
+state.backend.rocksdb.block.metadata-blocksize
+(none)
+MemorySize
+Approximate size of partitioned metadata packed per block. 
Currently applied to indexes block when partitioned index/filters option is 
enabled. RocksDB has default metadata blocksize as '4KB'.
+
+
 
state.backend.rocksdb.compaction.level.max-size-level-base
 (none)
 MemorySize
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 94ae969..f5dd310 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -47,6 +47,7 @@ import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_OPEN_FILES;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.METADATA_BLOCK_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE;
@@ -131,6 +132,10 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableRocksDBOpt
 blockBasedTableConfig.setBlockSize(getBlockSize());
 }
 
+if (isOptionConfigured(METADATA_BLOCK_SIZE)) {
+blockBasedTableConfig.setMetadataBlockSize(getMetadataBlockSize());
+}
+
 if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
 blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
 }
@@ -316,6 +321,23 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableRocksDBOpt
 }
 
 // 
--
+// Approximate size of partitioned metadata packed per block.
+// Currently applied to indexes block when partitioned index/filters 
option is enabled.
+// 
--
+
+private long getMetadataBlockSize() {
+return MemorySize.parseBytes(getInternal(METADATA_BLOCK_SIZE.key()));
+}
+
+public DefaultConfigurableOptionsFactory setMetadataBlockSize(String 
metadataBlockSize) {
+Preconditions.checkArgument(
+MemorySize.parseBytes(metadataBlockSize) > 0,
+"Invalid configuration " + metadataBlockSize + " for metadata 
block size.");
+setInternal(METADATA_BLOCK_SIZE.key(), metadataBlockSize);
+return this;
+}
+
+// 
--
 // The amount of the cache for data blocks in RocksDB
 // 
--
 
@@ -348,6 +370,7 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableRocksDBOpt
 MAX_WRITE_BUFFER_NUMBER,
 MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 BLOCK_SIZE,
+ 

[flink] branch master updated (951e2a5 -> 0adc5c2)

2021-03-03 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 951e2a5  [hotfix][docs] Fix merge conflicts of checkpointing.md
 add 0adc5c2  [FLINK-21021][python] Bump Beam to 2.27.0 (#14741)

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/deployment/cli.md |  4 +-
 .../docs/dev/python/datastream_tutorial.md |  2 +-
 docs/content.zh/docs/dev/python/installation.md|  4 +-
 .../docs/dev/python/table/udfs/python_udfs.md  |  2 +-
 .../python/table/udfs/vectorized_python_udfs.md|  2 +-
 .../docs/dev/python/table_api_tutorial.md  |  2 +-
 docs/content.zh/docs/dev/table/sqlClient.md|  4 +-
 docs/content.zh/docs/flinkDev/building.md  |  4 +-
 docs/content/docs/deployment/cli.md|  4 +-
 .../content/docs/dev/python/datastream_tutorial.md |  2 +-
 docs/content/docs/dev/python/installation.md   |  4 +-
 .../docs/dev/python/table/udfs/python_udfs.md  |  2 +-
 .../python/table/udfs/vectorized_python_udfs.md|  2 +-
 docs/content/docs/dev/python/table_api_tutorial.md |  2 +-
 docs/content/docs/dev/table/sqlClient.md   |  4 +-
 docs/content/docs/flinkDev/building.md |  4 +-
 .../shortcodes/generated/python_configuration.html |  2 +-
 .../apache/flink/client/cli/CliFrontendParser.java |  2 +-
 flink-python/README.md |  2 +-
 flink-python/dev/build-wheels.sh   |  2 +-
 flink-python/dev/dev-requirements.txt  |  2 +-
 flink-python/dev/lint-python.sh|  6 +--
 flink-python/pom.xml   |  7 
 flink-python/pyflink/__init__.py   |  4 +-
 .../datastream/stream_execution_environment.py |  6 +--
 flink-python/pyflink/table/table_config.py |  4 +-
 .../pyflink/table/tests/test_pandas_conversion.py  | 18 -
 flink-python/setup.py  | 16 
 .../org/apache/flink/python/PythonOptions.java |  4 +-
 flink-python/src/main/resources/META-INF/NOTICE| 44 +++---
 flink-python/tox.ini   |  4 +-
 pom.xml|  2 +-
 tools/releasing/create_binary_release.sh   |  4 +-
 33 files changed, 99 insertions(+), 78 deletions(-)



[flink-web] branch asf-site updated: Rebuild website (release-1.12.2)

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new de636e0  Rebuild website (release-1.12.2)
de636e0 is described below

commit de636e05296140091705206185c8eb4cf929b64b
Author: Roman Khachatryan 
AuthorDate: Wed Mar 3 18:46:43 2021 +0100

Rebuild website (release-1.12.2)
---
 content/news/2021/03/03/release-1.12.2.html | 475 
 1 file changed, 475 insertions(+)

diff --git a/content/news/2021/03/03/release-1.12.2.html 
b/content/news/2021/03/03/release-1.12.2.html
new file mode 100644
index 000..07db85a
--- /dev/null
+++ b/content/news/2021/03/03/release-1.12.2.html
@@ -0,0 +1,475 @@
+
+
+  
+
+
+
+
+Apache Flink: Apache Flink 1.12.2 Released
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+  
+
+
+
+
+
+
+
+  
+ 
+
+
+
+
+
+
+  
+
+
+
+  
+  
+
+  
+
+  
+
+
+
+
+  
+
+
+
+
+
+
+
+What is Apache 
Flink?
+
+
+
+
+
+What is Stateful 
Functions?
+
+
+Use Cases
+
+
+Powered By
+
+
+
+
+
+
+Downloads
+
+
+
+  Getting Started
+  
+https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/index.html;
 target="_blank">With Flink 
+https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/getting-started/project-setup.html;
 target="_blank">With Flink Stateful Functions 
+Training Course
+  
+
+
+
+
+  Documentation
+  
+https://ci.apache.org/projects/flink/flink-docs-release-1.12; 
target="_blank">Flink 1.12 (Latest stable release) 
+https://ci.apache.org/projects/flink/flink-docs-master; 
target="_blank">Flink Master (Latest Snapshot) 
+https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2; 
target="_blank">Flink Stateful Functions 2.2 (Latest stable release) 

+https://ci.apache.org/projects/flink/flink-statefun-docs-master; 
target="_blank">Flink Stateful Functions Master (Latest Snapshot) 
+  
+
+
+
+Getting Help
+
+
+Flink Blog
+
+
+
+
+  https://flink-packages.org; 
target="_blank">flink-packages.org 
+
+
+
+
+
+
+Community  Project Info
+
+
+Roadmap
+
+
+How to 
Contribute
+
+
+
+
+  https://github.com/apache/flink; target="_blank">Flink 
on GitHub 
+
+
+
+
+
+
+  
+
+  
+  中文版
+
+  
+
+
+  
+
+  
+.smalllinks:link {
+  display: inline-block !important; background: none; padding-top: 
0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
+}
+  
+
+  
+  
+
+
+https://twitter.com/apacheflink; 
target="_blank">@ApacheFlink 
+
+
+Plan Visualizer 
+
+
+  Flink Security
+
+
+  
+
+https://apache.org; target="_blank">Apache Software 
Foundation 
+
+
+
+  https://www.apache.org/licenses/; 
target="_blank">License 
+
+  https://www.apache.org/security/; 
target="_blank">Security 
+
+  https://www.apache.org/foundation/sponsorship.html; 
target="_blank">Donate 
+
+  https://www.apache.org/foundation/thanks.html; target="_blank">Thanks 

+
+
+  
+
+
+
+  
+  
+  
+  
+
+  Apache Flink 1.12.2 Released
+  
+
+  
+03 Mar 2021 Yuan Mei   Roman Khachatryan 
+
+The Apache Flink community released the next bugfix version of the Apache 
Flink 1.12 series.
+
+This release includes 83 fixes and minor improvements for Flink 1.12.1. The 
list below includes a detailed list of all fixes and improvements.
+
+We highly recommend all users to upgrade to Flink 1.12.2.
+
+Updated Maven dependencies:
+
+dependency
+  groupIdorg.apache.flink/groupId
+  artifactIdflink-java/artifactId
+  version1.12.2/version
+/dependency
+dependency
+  groupIdorg.apache.flink/groupId
+  

[flink-web] 04/04: Rebuild website (release-1.12.2)

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit fb9a6ea45d3d8bf944697ef619aeb6b21973965e
Author: Roman Khachatryan 
AuthorDate: Wed Mar 3 16:34:02 2021 +0100

Rebuild website (release-1.12.2)
---
 content/blog/feed.xml  | 256 -
 content/blog/index.html|  38 +++---
 content/blog/page10/index.html |  38 +++---
 content/blog/page11/index.html |  40 ---
 content/blog/page12/index.html |  40 ---
 content/blog/page13/index.html |  40 ---
 content/blog/page14/index.html |  40 ---
 content/blog/page15/index.html |  25 
 content/blog/page2/index.html  |  38 +++---
 content/blog/page3/index.html  |  41 ---
 content/blog/page4/index.html  |  39 ---
 content/blog/page5/index.html  |  36 +++---
 content/blog/page6/index.html  |  36 +++---
 content/blog/page7/index.html  |  36 +++---
 content/blog/page8/index.html  |  38 +++---
 content/blog/page9/index.html  |  38 +++---
 content/downloads.html |  33 --
 content/index.html |   8 +-
 content/q/gradle-quickstart.sh |   2 +-
 content/q/quickstart-scala.sh  |   2 +-
 content/q/quickstart.sh|   2 +-
 content/q/sbt-quickstart.sh|   2 +-
 content/zh/downloads.html  |  37 +++---
 content/zh/index.html  |   8 +-
 24 files changed, 651 insertions(+), 262 deletions(-)

diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index c63e53b..750a7af 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,233 @@
 https://flink.apache.org/blog/feed.xml; rel="self" 
type="application/rss+xml" />
 
 
+Apache Flink 1.12.2 Released
+pThe Apache Flink community released the next bugfix 
version of the Apache Flink 1.12 series./p
+
+pThis release includes 83 fixes and minor improvements for Flink 
1.12.1. The list below includes a detailed list of all fixes and 
improvements./p
+
+pWe highly recommend all users to upgrade to Flink 1.12.2./p
+
+pUpdated Maven dependencies:/p
+
+div class=highlightprecode 
class=language-xmlspan 
class=ntlt;dependencygt;/span
+  span 
class=ntlt;groupIdgt;/spanorg.apache.flinkspan
 class=ntlt;/groupIdgt;/span
+  span 
class=ntlt;artifactIdgt;/spanflink-javaspan
 class=ntlt;/artifactIdgt;/span
+  span 
class=ntlt;versiongt;/span1.12.2span 
class=ntlt;/versiongt;/span
+span class=ntlt;/dependencygt;/span
+span class=ntlt;dependencygt;/span
+  span 
class=ntlt;groupIdgt;/spanorg.apache.flinkspan
 class=ntlt;/groupIdgt;/span
+  span 
class=ntlt;artifactIdgt;/spanflink-streaming-java_2.11span
 class=ntlt;/artifactIdgt;/span
+  span 
class=ntlt;versiongt;/span1.12.2span 
class=ntlt;/versiongt;/span
+span class=ntlt;/dependencygt;/span
+span class=ntlt;dependencygt;/span
+  span 
class=ntlt;groupIdgt;/spanorg.apache.flinkspan
 class=ntlt;/groupIdgt;/span
+  span 
class=ntlt;artifactIdgt;/spanflink-clients_2.11span
 class=ntlt;/artifactIdgt;/span
+  span 
class=ntlt;versiongt;/span1.12.2span 
class=ntlt;/versiongt;/span
+span 
class=ntlt;/dependencygt;/span/code/pre/div
+
+pYou can find the binaries on the updated a 
href=/downloads.htmlDownloads page/a./p
+
+pList of resolved issues:/p
+
+h2Sub-task
+/h2
+ul
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-21070FLINK-21070/a;]
 - Overloaded aggregate functions cause converter errors
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-21486FLINK-21486/a;]
 - Add sanity check when switching from Rocks to Heap timers
+/li
+/ul
+
+h2Bug
+/h2
+ul
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-12461FLINK-12461/a;]
 - Document binary compatibility situation with Scala beyond 2.12.8
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-16443FLINK-16443/a;]
 - Fix wrong fix for user-code CheckpointExceptions
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-19771FLINK-19771/a;]
 - NullPointerException when accessing null array from postgres in JDBC 
Connector
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-20309FLINK-20309/a;]
 - UnalignedCheckpointTestBase.execute is failed
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-20462FLINK-20462/a;]
 - MailboxOperatorTest.testAvoidTaskStarvation
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-20500FLINK-20500/a;]
 - UpsertKafkaTableITCase.testTemporalJoin test failed
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-20565FLINK-20565/a;]
 - Fix typo in EXPLAIN Statements docs.
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-20580FLINK-20580/a;]
 - Missing null value handling for SerializedValue#39;s 
getByteArray() 
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-20654FLINK-20654/a;]
 - Unaligned checkpoint recovery may lead to corrupted data stream
+/li
+li[a 

[flink-web] 01/04: Add Apache Flink 1.12.2 release

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit ee40eaf059770bd212d0e6181c11a98b9dc01f08
Author: Roman Khachatryan 
AuthorDate: Tue Feb 16 19:30:34 2021 +0100

Add Apache Flink 1.12.2 release
---
 _config.yml |  42 
 _posts/2021-02-16-release-1.12.2.md | 205 
 q/gradle-quickstart.sh  |   2 +-
 q/quickstart-scala.sh   |   2 +-
 q/quickstart.sh |   2 +-
 q/sbt-quickstart.sh |   2 +-
 6 files changed, 232 insertions(+), 23 deletions(-)

diff --git a/_config.yml b/_config.yml
index a89b83e..921369b 100644
--- a/_config.yml
+++ b/_config.yml
@@ -9,7 +9,7 @@ url: https://flink.apache.org
 
 DOCS_BASE_URL: https://ci.apache.org/projects/flink/
 
-FLINK_VERSION_STABLE: 1.12.1
+FLINK_VERSION_STABLE: 1.12.2
 FLINK_VERSION_STABLE_SHORT: "1.12"
 
 FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
@@ -58,32 +58,32 @@ flink_releases:
   -
 version_short: "1.12"
 binary_release:
-  name: "Apache Flink 1.12.1"
+  name: "Apache Flink 1.12.2"
   scala_211:
-id: "1121-download_211"
-url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.tgz;
-asc_url: 
"https://downloads.apache.org/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.tgz.asc;
-sha512_url: 
"https://downloads.apache.org/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.tgz.sha512;
+id: "1122-download_211"
+url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz;
+asc_url: 
"https://downloads.apache.org/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz.asc;
+sha512_url: 
"https://downloads.apache.org/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz.sha512;
   scala_212:
-id: "1121-download_212"
-url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.12.tgz;
-asc_url: 
"https://downloads.apache.org/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.12.tgz.asc;
-sha512_url: 
"https://downloads.apache.org/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.12.tgz.sha512;
+id: "1122-download_212"
+url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz;
+asc_url: 
"https://downloads.apache.org/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz.asc;
+sha512_url: 
"https://downloads.apache.org/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz.sha512;
 source_release:
-  name: "Apache Flink 1.12.1"
-  id: "1121-download-source"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.12.1/flink-1.12.1-src.tgz;
-  asc_url: 
"https://downloads.apache.org/flink/flink-1.12.1/flink-1.12.1-src.tgz.asc;
-  sha512_url: 
"https://downloads.apache.org/flink/flink-1.12.1/flink-1.12.1-src.tgz.sha512;
+  name: "Apache Flink 1.12.2"
+  id: "1122-download-source"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.12.2/flink-1.12.2-src.tgz;
+  asc_url: 
"https://downloads.apache.org/flink/flink-1.12.2/flink-1.12.2-src.tgz.asc;
+  sha512_url: 
"https://downloads.apache.org/flink/flink-1.12.2/flink-1.12.2-src.tgz.sha512;
 optional_components:
   -
 name: "Avro SQL Format"
 category: "SQL Formats"
 scala_dependent: false
-id: 1121-sql-format-avro
-url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.12.1/flink-avro-1.12.1.jar
-asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.12.1/flink-avro-1.12.1.jar.asc
-sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.12.1/flink-avro-1.12.1.jar.sha1
+id: 1122-sql-format-avro
+url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.12.2/flink-avro-1.12.2.jar
+asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.12.2/flink-avro-1.12.2.jar.asc
+sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.12.2/flink-avro-1.12.2.jar.sha1
   -
   version_short: "1.11"
   binary_release:
@@ -206,6 +206,10 @@ component_releases:
 release_archive:
 flink:
   - version_short: "1.12"
+version_long: 1.12.2
+release_date: 2021-02-16
+  -
+version_short: "1.12"
 version_long: 1.12.1
 release_date: 2021-01-19
   -
diff --git a/_posts/2021-02-16-release-1.12.2.md 
b/_posts/2021-02-16-release-1.12.2.md
new file mode 100644
index 000..e0bba69
--- /dev/null
+++ b/_posts/2021-02-16-release-1.12.2.md
@@ -0,0 +1,205 @@
+---
+layout: post
+title:  "Apache Flink 1.12.2 Released"
+date:   2021-02-16 00:00:00
+categories: news
+authors:
+- yuan:
+  name: "Yuan Mei"
+- 

[flink-web] 02/04: RC#2 update

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 0859196196e44323087896e873af889980fb92a4
Author: Roman Khachatryan 
AuthorDate: Sat Feb 27 00:11:49 2021 +0100

RC#2 update
---
 _config.yml|  2 +-
 ...ease-1.12.2.md => 2021-02-27-release-1.12.2.md} | 33 --
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/_config.yml b/_config.yml
index 921369b..260d918 100644
--- a/_config.yml
+++ b/_config.yml
@@ -207,7 +207,7 @@ release_archive:
 flink:
   - version_short: "1.12"
 version_long: 1.12.2
-release_date: 2021-02-16
+release_date: 2021-02-27
   -
 version_short: "1.12"
 version_long: 1.12.1
diff --git a/_posts/2021-02-16-release-1.12.2.md 
b/_posts/2021-02-27-release-1.12.2.md
similarity index 84%
rename from _posts/2021-02-16-release-1.12.2.md
rename to _posts/2021-02-27-release-1.12.2.md
index e0bba69..68eaaa2 100644
--- a/_posts/2021-02-16-release-1.12.2.md
+++ b/_posts/2021-02-27-release-1.12.2.md
@@ -1,7 +1,7 @@
 ---
 layout: post
 title:  "Apache Flink 1.12.2 Released"
-date:   2021-02-16 00:00:00
+date:   2021-02-27 00:00:00
 categories: news
 authors:
 - yuan:
@@ -13,7 +13,7 @@ authors:
 
 The Apache Flink community released the next bugfix version of the Apache 
Flink 1.12 series.
 
-This release includes 69 fixes and minor improvements for Flink 1.12.1. The 
list below includes a detailed list of all fixes and improvements.
+This release includes 83 fixes and minor improvements for Flink 1.12.1. The 
list below includes a detailed list of all fixes and improvements.
 
 We highly recommend all users to upgrade to Flink 1.12.2.
 
@@ -46,6 +46,8 @@ List of resolved issues:
 
 [FLINK-21070] - 
Overloaded aggregate functions cause converter errors
 
+[FLINK-21486] - 
Add sanity check when switching from Rocks to Heap timers
+
 
 
 Bug
@@ -65,6 +67,8 @@ List of resolved issues:
 
 [FLINK-20565] - 
Fix typo in EXPLAIN Statements docs.
 
+[FLINK-20580] - 
Missing null value handling for SerializedValues getByteArray() 
+
 [FLINK-20654] - 
Unaligned checkpoint recovery may lead to corrupted data stream
 
 [FLINK-20663] - 
Managed memory may not be released in time when operators use managed 
memory frequently
@@ -103,6 +107,10 @@ List of resolved issues:
 
 [FLINK-21024] - 
Dynamic properties get exposed to jobs main method if user parameters 
are passed
 
+[FLINK-21028] - 
Streaming application didnt stop properly 
+
+[FLINK-21030] - 
Broken job restart for job with disjoint graph
+
 [FLINK-21059] - 
KafkaSourceEnumerator does not honor consumer properties
 
 [FLINK-21069] - 
Configuration parallelism.default doesnt take effect for 
TableEnvironment#explainSql
@@ -129,6 +137,8 @@ List of resolved issues:
 
 [FLINK-21208] - 
pyarrow exception when using window with pandas udaf
 
+[FLINK-21213] - 
e2e test fail with As task is already not running, no longer decline 
checkpoint
+
 [FLINK-21215] - 
Checkpoint was declined because one input stream is finished
 
 [FLINK-21216] - 
StreamPandasConversionTests Fails
@@ -145,8 +155,20 @@ List of resolved issues:
 
 [FLINK-21323] - 
Stop-with-savepoint is not supported by SourceOperatorStreamTask
 
+[FLINK-21351] - 
Incremental checkpoint data would be lost once a non-stop savepoint 
completed
+
 [FLINK-21361] - 
FlinkRelMdUniqueKeys matches on AbstractCatalogTable instead of CatalogTable
 
+[FLINK-21412] - 
pyflink DataTypes.DECIMAL is not available
+
+[FLINK-21452] - 
FLIP-27 sources cannot reliably downscale
+
+[FLINK-21453] - 
BoundedOneInput.endInput is NOT called when doing stop with savepoint WITH 
drain
+
+[FLINK-21490] - 
UnalignedCheckpointITCase fails on azure
+
+[FLINK-21492] - 
ActiveResourceManager swallows exception stack trace
+
 
 
 New Feature
@@ -200,6 +222,13 @@ List of resolved issues:
 
 [FLINK-20529] - 
Publish Dockerfiles for release 1.12.0
 
+[FLINK-20534] - 
Add Flink 1.12 MigrationVersion
+
+[FLINK-20536] - 
Update migration tests in master to cover migration from release-1.12
+
 [FLINK-20960] - 
Add warning in 1.12 release notes about potential corrupt data stream with 
unaligned checkpoint 
 
+[FLINK-21358] - 
Missing snapshot version compatibility for 1.12
+
 
+



[flink-web] branch asf-site updated (c931437 -> fb9a6ea)

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from c931437  Missing ending doublequote for defaultFlinkVersion.
 new ee40eaf  Add Apache Flink 1.12.2 release
 new 0859196  RC#2 update
 new d07631c  Set 1.12.2 release_date to 2021-03-03
 new fb9a6ea  Rebuild website (release-1.12.2)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _config.yml |  42 +++---
 _posts/2021-03-03-release-1.12.2.md | 234 
 content/blog/feed.xml   | 256 
 content/blog/index.html |  38 --
 content/blog/page10/index.html  |  38 +++---
 content/blog/page11/index.html  |  40 +++---
 content/blog/page12/index.html  |  40 +++---
 content/blog/page13/index.html  |  40 +++---
 content/blog/page14/index.html  |  40 +++---
 content/blog/page15/index.html  |  25 
 content/blog/page2/index.html   |  38 +++---
 content/blog/page3/index.html   |  41 +++---
 content/blog/page4/index.html   |  39 --
 content/blog/page5/index.html   |  36 +++--
 content/blog/page6/index.html   |  36 +++--
 content/blog/page7/index.html   |  36 +++--
 content/blog/page8/index.html   |  38 +++---
 content/blog/page9/index.html   |  38 --
 content/downloads.html  |  33 +++--
 content/index.html  |   8 +-
 content/q/gradle-quickstart.sh  |   2 +-
 content/q/quickstart-scala.sh   |   2 +-
 content/q/quickstart.sh |   2 +-
 content/q/sbt-quickstart.sh |   2 +-
 content/zh/downloads.html   |  37 --
 content/zh/index.html   |   8 +-
 q/gradle-quickstart.sh  |   2 +-
 q/quickstart-scala.sh   |   2 +-
 q/quickstart.sh |   2 +-
 q/sbt-quickstart.sh |   2 +-
 30 files changed, 912 insertions(+), 285 deletions(-)
 create mode 100644 _posts/2021-03-03-release-1.12.2.md



[flink-web] 03/04: Set 1.12.2 release_date to 2021-03-03

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit d07631c5f44a53e6954f45eb394e4ad0ea8bd7f1
Author: Roman Khachatryan 
AuthorDate: Wed Mar 3 15:20:33 2021 +0100

Set 1.12.2 release_date to 2021-03-03
---
 _config.yml   | 2 +-
 _posts/{2021-02-27-release-1.12.2.md => 2021-03-03-release-1.12.2.md} | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/_config.yml b/_config.yml
index 260d918..945b9b6 100644
--- a/_config.yml
+++ b/_config.yml
@@ -207,7 +207,7 @@ release_archive:
 flink:
   - version_short: "1.12"
 version_long: 1.12.2
-release_date: 2021-02-27
+release_date: 2021-03-03
   -
 version_short: "1.12"
 version_long: 1.12.1
diff --git a/_posts/2021-02-27-release-1.12.2.md 
b/_posts/2021-03-03-release-1.12.2.md
similarity index 99%
rename from _posts/2021-02-27-release-1.12.2.md
rename to _posts/2021-03-03-release-1.12.2.md
index 68eaaa2..b71cb0f 100644
--- a/_posts/2021-02-27-release-1.12.2.md
+++ b/_posts/2021-03-03-release-1.12.2.md
@@ -1,7 +1,7 @@
 ---
 layout: post
 title:  "Apache Flink 1.12.2 Released"
-date:   2021-02-27 00:00:00
+date:   2021-03-03 00:00:00
 categories: news
 authors:
 - yuan:



[flink] branch master updated: [hotfix][docs] Fix merge conflicts of checkpointing.md

2021-03-03 Thread sjwiesman
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 951e2a5  [hotfix][docs] Fix merge conflicts of checkpointing.md
951e2a5 is described below

commit 951e2a53f209a4bbd9ac21989086363365c2e737
Author: Yun Tang 
AuthorDate: Wed Mar 3 22:48:10 2021 +0800

[hotfix][docs] Fix merge conflicts of checkpointing.md

This closes #15079
---
 .../docs/dev/datastream/fault-tolerance/checkpointing.md| 13 ++---
 1 file changed, 2 insertions(+), 11 deletions(-)

diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index 807cbf0..c1e6297 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -50,12 +50,8 @@ By default, checkpointing is disabled. To enable 
checkpointing, call `enableChec
 
 Other parameters for checkpointing include:
 
-<<< HEAD
-  - *checkpoint storage*: You can set the location where checkpoints snapshots 
are made durable. By default Flink will use the JobManager's heap. For 
production deployments it is recomended to instead use a durable filesystem. 
See [checkpoint storage]({{< ref 
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the 
available options for job-wide and cluster-wide configuration.
-===
   - *checkpoint storage*: You can set the location where checkpoint snapshots 
are made durable. By default Flink will use the JobManager's heap. For 
production deployments it is recomended to instead use a durable filesystem. 
See [checkpoint storage]({{< ref 
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the 
available options for job-wide and cluster-wide configuration.
->>> 2c620f19df5d81be1f665b5a90439c185e0fd3b7
- 
+
   - *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
 Exactly-once is preferable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
 
@@ -202,14 +198,9 @@ Where the checkpoints are stored (e.g., JobManager memory, 
file system, database
 **Checkpoint Storage**. 
 
 By default, checkpoints are stored in memory in the JobManager. For proper 
persistence of large state,
-Flink supports various approaches for checkpointing state in other locations. 
-<<< HEAD
-The choice of checkpoint storag ecan be configured via 
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
-It is highly encouraged that checkpoints are stored in a highly-available 
filesystem for most production deployments. 
-===
+Flink supports various approaches for checkpointing state in other locations.
 The choice of checkpoint storage can be configured via 
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
 It is strongly encouraged that checkpoints be stored in a highly-available 
filesystem for production deployments. 
->>> 2c620f19df5d81be1f665b5a90439c185e0fd3b7
 
 See [checkpoint storage]({{< ref 
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the 
available options for job-wide and cluster-wide configuration.
 



svn commit: r46446 - /dev/flink/flink-1.12.2-rc2/ /release/flink/flink-1.12.2/

2021-03-03 Thread pnowojski
Author: pnowojski
Date: Wed Mar  3 14:13:50 2021
New Revision: 46446

Log:
Release Flink 1.12.2

Added:
release/flink/flink-1.12.2/
  - copied from r46445, dev/flink/flink-1.12.2-rc2/
Removed:
dev/flink/flink-1.12.2-rc2/



[flink] branch master updated: [FLINK-21581][runtime] Mark RuntimeContext.getJobId @PublicEvolving

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c60a31f  [FLINK-21581][runtime] Mark RuntimeContext.getJobId 
@PublicEvolving
c60a31f is described below

commit c60a31fd70dc985ab943c105f24b941ff42ab082
Author: Roman Khachatryan 
AuthorDate: Wed Mar 3 10:52:15 2021 +0100

[FLINK-21581][runtime] Mark RuntimeContext.getJobId @PublicEvolving

The JobID added in FLINK-21570 is
1) a breaking change
2) may be changed after removing DataSet API
---
 .../main/java/org/apache/flink/api/common/functions/RuntimeContext.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 11c9198..89debd0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -63,6 +63,7 @@ public interface RuntimeContext {
  * standalone collection executor). Note that Job ID can change in 
particular upon manual
  * restart. The returned ID should NOT be used for any job management 
tasks.
  */
+@PublicEvolving
 Optional getJobId();
 
 /**



[flink] branch master updated (044f8e1 -> 3148b82)

2021-03-03 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 044f8e1  [FLINK-21115][python] Add AggregatingState and corresponding 
StateDescriptor for Python DataStream API
 add 3148b82  [FLINK-21475][decl-scheduler] Support 
StateWithExecutionGraph.suspend when EG has reached globally terminal state

No new revisions were added by this update.

Summary of changes:
 .../adaptive/StateWithExecutionGraph.java  |   2 +-
 .../adaptive/StateWithExecutionGraphTest.java  | 148 +
 2 files changed, 149 insertions(+), 1 deletion(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java



[flink] branch release-1.12 updated (8be24ee -> 591cd3c)

2021-03-03 Thread weizhong
This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 8be24ee  [FLINK-21497][coordination] Only complete leader future with 
valid leader
 add 591cd3c  [FLINK-21434][python] Fix encoding error when using the fast 
coder to encode a row-type field containing more than 14 fields

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/fn_execution/coder_impl_fast.pyx   | 4 ++--
 flink-python/pyflink/fn_execution/tests/test_fast_coders.py | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)



[flink] branch master updated: [FLINK-21115][python] Add AggregatingState and corresponding StateDescriptor for Python DataStream API

2021-03-03 Thread weizhong
This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 044f8e1  [FLINK-21115][python] Add AggregatingState and corresponding 
StateDescriptor for Python DataStream API
044f8e1 is described below

commit 044f8e17237d0fdbff96ca973817c932a3c985ba
Author: Wei Zhong 
AuthorDate: Wed Mar 3 20:54:52 2021 +0800

[FLINK-21115][python] Add AggregatingState and corresponding 
StateDescriptor for Python DataStream API

This closes #15028.
---
 flink-python/pyflink/datastream/functions.py   |  92 -
 flink-python/pyflink/datastream/state.py   |  42 
 .../pyflink/datastream/tests/test_data_stream.py   |  49 -
 flink-python/pyflink/fn_execution/operations.py|   8 +-
 flink-python/pyflink/fn_execution/state_impl.py| 109 +
 5 files changed, 254 insertions(+), 46 deletions(-)

diff --git a/flink-python/pyflink/datastream/functions.py 
b/flink-python/pyflink/datastream/functions.py
index 60e718f..b6952ce 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -23,7 +23,8 @@ from typing import Union, Any, Dict
 from py4j.java_gateway import JavaObject
 
 from pyflink.datastream.state import ValueState, ValueStateDescriptor, 
ListStateDescriptor, \
-ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, 
ReducingState
+ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, 
ReducingState, \
+AggregatingStateDescriptor, AggregatingState
 from pyflink.datastream.time_domain import TimeDomain
 from pyflink.datastream.timerservice import TimerService
 from pyflink.java_gateway import get_gateway
@@ -35,6 +36,7 @@ __all__ = [
 'FlatMapFunction',
 'CoFlatMapFunction',
 'ReduceFunction',
+'AggregateFunction',
 'KeySelector',
 'FilterFunction',
 'Partitioner',
@@ -156,6 +158,17 @@ class RuntimeContext(object):
 """
 pass
 
+def get_aggregating_state(
+self, state_descriptor: AggregatingStateDescriptor) -> 
AggregatingState:
+"""
+Gets a handle to the system's key/value aggregating state. This state 
is similar to the
+state accessed via get_state(ValueStateDescriptor), but is optimized 
for state that
+aggregates values with different types.
+
+This state is only accessible if the function is executed on a 
KeyedStream.
+"""
+pass
+
 
 class Function(abc.ABC):
 """
@@ -342,6 +355,83 @@ class ReduceFunction(Function):
 pass
 
 
+class AggregateFunction(Function):
+"""
+The AggregateFunction is a flexible aggregation function, characterized by 
the following
+features:
+
+- The aggregates may use different types for input values, 
intermediate aggregates, and
+  result type, to support a wide range of aggregation types.
+- Support for distributive aggregations: Different intermediate 
aggregates can be merged
+  together, to allow for pre-aggregation/final-aggregation 
optimizations.
+
+The AggregateFunction's intermediate aggregate (in-progress aggregation 
state) is called the
+`accumulator`. Values are added to the accumulator, and final aggregates 
are obtained by
+finalizing the accumulator state. This supports aggregation functions 
where the intermediate
+state needs to be different than the aggregated values and the final 
result type, such as for
+example average (which typically keeps a count and sum). Merging 
intermediate aggregates
+(partial aggregates) means merging the accumulators.
+
+The AggregationFunction itself is stateless. To allow a single 
AggregationFunction instance to
+maintain multiple aggregates (such as one aggregate per key), the 
AggregationFunction creates a
+new accumulator whenever a new aggregation is started.
+"""
+
+@abc.abstractmethod
+def create_accumulator(self):
+"""
+Creates a new accumulator, starting a new aggregate.
+
+The new accumulator is typically meaningless unless a value is added 
via
+:func:`~AggregateFunction.add`.
+
+The accumulator is the state of a running aggregation. When a program 
has multiple
+aggregates in progress (such as per key and window), the state (per 
key and window) is the
+size of the accumulator.
+
+:return: A new accumulator, corresponding to an empty aggregate.
+"""
+pass
+
+@abc.abstractmethod
+def add(self, value, accumulator):
+"""
+Adds the given input value to the given accumulator, returning the new 
accumulator value.
+
+For efficiency, the input accumulator may be modified and returned.
+
+:param value: The value to add.
+:param accumulator: The 

[flink] branch master updated (7602d2e -> 1b0e592)

2021-03-03 Thread weizhong
This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7602d2e  [FLINK-21502][coordination] Reduce frequency of global 
re-allocate resources
 add 1b0e592  [FLINK-21434][python] Fix encoding error when using the fast 
coder to encode a row-type field containing more than 14 fields

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/fn_execution/coder_impl_fast.pyx   | 4 ++--
 flink-python/pyflink/fn_execution/tests/test_fast_coders.py | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)



[flink] 01/01: [hotfix][docs] wrong brackets in CREATE VIEW statement

2021-03-03 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch NicoK-hotfix-docs-create-view
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8eab5f240b1dabeab626fbebde24b6eaddc6e0c
Author: Nico Kruber 
AuthorDate: Wed Mar 3 13:32:06 2021 +0100

[hotfix][docs] wrong brackets in CREATE VIEW statement
---
 docs/content/docs/dev/table/sql/create.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/content/docs/dev/table/sql/create.md 
b/docs/content/docs/dev/table/sql/create.md
index 0e7e2e9..9113ba7 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -559,7 +559,7 @@ The key and value of expression `key1=val1` should both be 
string literal.
 ## CREATE VIEW
 ```sql
 CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
-  [{columnName [, columnName ]* }] [COMMENT view_comment]
+  [( columnName [, columnName ]* )] [COMMENT view_comment]
   AS query_expression
 ```
 



[flink] branch NicoK-hotfix-docs-create-view created (now b8eab5f)

2021-03-03 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch NicoK-hotfix-docs-create-view
in repository https://gitbox.apache.org/repos/asf/flink.git.


  at b8eab5f  [hotfix][docs] wrong brackets in CREATE VIEW statement

This branch includes the following new commits:

 new b8eab5f  [hotfix][docs] wrong brackets in CREATE VIEW statement

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink-statefun] branch master updated: [hotfix] Fix README.md images

2021-03-03 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
 new 1bea597  [hotfix] Fix README.md images
1bea597 is described below

commit 1bea5976fbf6316fb65ad8e2fd3e4705179e4ee6
Author: Patrick Lucas 
AuthorDate: Wed Mar 3 10:08:54 2021 +0100

[hotfix] Fix README.md images

The docs content was reorganized, breaking the image links in README.md.

This closes #207.
---
 README.md | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 9162c75..0d29bce 100755
--- a/README.md
+++ b/README.md
@@ -1,10 +1,10 @@
-
+
 
 Stateful Functions is an API that simplifies the building of **distributed 
stateful applications** with a **runtime built for serverless architectures**.
 It brings together the benefits of stateful stream processing - the processing 
of large datasets with low latency and bounded resource constraints -
 along with a runtime for modeling stateful entities that supports location 
transparency, concurrency, scaling, and resiliency. 
 
-
+
 
 It is designed to work with modern architectures, like cloud-native 
deployments and popular event-driven FaaS platforms 
 like AWS Lambda and KNative, and to provide out-of-the-box consistent state 
and messaging while preserving the serverless
@@ -46,7 +46,7 @@ A Stateful Functions application consists of the following 
primitives: stateful
 routers and egresses.
 
 
-  
+  
 
 
  Stateful functions
@@ -99,7 +99,7 @@ into the same larger application.
 The Stateful Functions runtime is designed to provide a set of properties 
similar to what characterizes [serverless 
functions](https://martinfowler.com/articles/serverless.html), but applied to 
stateful problems.
 
 
-  
+  
 
 
 The runtime is built on Apache Flink®, with the following design 
principles:
@@ -235,4 +235,4 @@ You can learn more about how to contribute in the [Apache 
Flink website](https:/
 
 ## License
 
-The code in this repository is licensed under the [Apache Software License 
2](LICENSE).
\ No newline at end of file
+The code in this repository is licensed under the [Apache Software License 
2](LICENSE).



[flink] annotated tag release-1.12.2 updated (4dedee0 -> 6310858)

2021-03-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to annotated tag release-1.12.2
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.12.2 was modified! ***

from 4dedee0  (commit)
  to 6310858  (tag)
 tagging 4e55bbc2f5f3ed3c08a552b3bd253bb7f3543c6f (tag)
  length 834 bytes
  by Roman Khachatryan
  on Tue Mar 2 19:42:26 2021 +0100

- Log -
release-1.12.2 (from approved release-1.12.2-rc2)
-BEGIN PGP SIGNATURE-

iQGzBAABCgAdFiEEDVRfJk0t/ev9TgOPl7RiXi/PUXwFAmA+hzQACgkQl7RiXi/P
UXxMmgwAoJitMK/isrI18PO5HeS0rz+YBtWK3oRNrVVKBLmrHF7YzlmLM5sZUOU+
SssNQ1JE5g3HLymyhKzfuly2h5qp5/rmHmVWv1Pocy3+rhfFCfeBUExk0yr2aIYR
XRavYdV67JUGmQYQD+qmzkHMV5KIX2s8u8LvmwGyhyk/b49s/lREsxXvony3TV1n
xcVsn8VK9kUExBWcxy8yz47hurymmc99vcz+fYmZO7HbZgzLKkU+XCxv45ZbSifv
w4tJ18AEaxLmzrjVhuIcyhQGJPt1GntEIiUGQFC2wVUL2+k6Hc1mZQom+m4ZXVDQ
SCey3VlDlt44+nVNUBdwheO4WgcaRobAfkhuIMaURcnaRW7+b8dpWxTBfi7DIfpR
70DNu/EYqCz3bGvZ4yVsjnH7kNFtPuQAVnILk2rAEaqUg/BGa1TdPlohMTknizYr
DETcU3MIihTNpXcspuT4GAEjdO13XQiOxn73pNbJHO93M34VYIFhsP4mn/5NN5AC
5yZprARS
=2K8X
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[flink] branch release-1.11 updated: [FLINK-21497][coordination] Only complete leader future with valid leader

2021-03-03 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new b61e01a  [FLINK-21497][coordination] Only complete leader future with 
valid leader
b61e01a is described below

commit b61e01ac1644151ba02a4833181adb1383b1a9da
Author: Chesnay Schepler 
AuthorDate: Thu Feb 25 14:27:46 2021 +0100

[FLINK-21497][coordination] Only complete leader future with valid leader
---
 .../leaderretrieval/LeaderRetrievalListener.java   |  3 ++
 .../resourcemanager/JobLeaderIdService.java| 29 +--
 .../resourcemanager/JobLeaderIdServiceTest.java| 56 ++
 3 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
index d485241..da89ecc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -31,6 +31,9 @@ public interface LeaderRetrievalListener {
 /**
  * This method is called by the {@link LeaderRetrievalService} when a new 
leader is elected.
  *
+ * If both arguments are null then it signals that leadership was 
revoked without a new
+ * leader having been elected.
+ *
  * @param leaderAddress The address of the new leader
  * @param leaderSessionID The new leader session ID
  */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 25a9889..8dec9d3 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -267,10 +267,9 @@ public class JobLeaderIdService {
 }
 
 @Override
-public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionId) {
+public void notifyLeaderAddress(
+@Nullable String leaderAddress, @Nullable UUID 
leaderSessionId) {
 if (running) {
-LOG.debug("Found a new job leader {}@{}.", leaderSessionId, 
leaderAddress);
-
 UUID previousJobLeaderId = null;
 
 if (leaderIdFuture.isDone()) {
@@ -281,9 +280,29 @@ public class JobLeaderIdService {
 handleError(e);
 }
 
-leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
+if (leaderSessionId == null) {
+// there was a leader, but we no longer have one
+LOG.debug("Job {} no longer has a job leader.", jobId);
+leaderIdFuture = new CompletableFuture<>();
+} else {
+// there was an active leader, but we now have a new 
leader
+LOG.debug(
+"Job {} has a new job leader {}@{}.",
+jobId,
+leaderSessionId,
+leaderAddress);
+leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
+}
 } else {
-leaderIdFuture.complete(leaderSessionId);
+if (leaderSessionId != null) {
+// there was no active leader, but we now have a new 
leader
+LOG.debug(
+"Job {} has a new job leader {}@{}.",
+jobId,
+leaderSessionId,
+leaderAddress);
+leaderIdFuture.complete(leaderSessionId);
+}
 }
 
 if (previousJobLeaderId != null && 
!previousJobLeaderId.equals(leaderSessionId)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index f06d7a4..02032bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import 

[flink] branch release-1.12 updated: [FLINK-21497][coordination] Only complete leader future with valid leader

2021-03-03 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
 new 8be24ee  [FLINK-21497][coordination] Only complete leader future with 
valid leader
8be24ee is described below

commit 8be24ee5c8b750765c0da0558f4928497d09a651
Author: Chesnay Schepler 
AuthorDate: Thu Feb 25 14:27:46 2021 +0100

[FLINK-21497][coordination] Only complete leader future with valid leader
---
 .../leaderretrieval/LeaderRetrievalListener.java   |  3 ++
 .../resourcemanager/JobLeaderIdService.java| 29 +--
 .../resourcemanager/JobLeaderIdServiceTest.java| 56 ++
 3 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
index d485241..da89ecc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -31,6 +31,9 @@ public interface LeaderRetrievalListener {
 /**
  * This method is called by the {@link LeaderRetrievalService} when a new 
leader is elected.
  *
+ * If both arguments are null then it signals that leadership was 
revoked without a new
+ * leader having been elected.
+ *
  * @param leaderAddress The address of the new leader
  * @param leaderSessionID The new leader session ID
  */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 25a9889..8dec9d3 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -267,10 +267,9 @@ public class JobLeaderIdService {
 }
 
 @Override
-public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionId) {
+public void notifyLeaderAddress(
+@Nullable String leaderAddress, @Nullable UUID 
leaderSessionId) {
 if (running) {
-LOG.debug("Found a new job leader {}@{}.", leaderSessionId, 
leaderAddress);
-
 UUID previousJobLeaderId = null;
 
 if (leaderIdFuture.isDone()) {
@@ -281,9 +280,29 @@ public class JobLeaderIdService {
 handleError(e);
 }
 
-leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
+if (leaderSessionId == null) {
+// there was a leader, but we no longer have one
+LOG.debug("Job {} no longer has a job leader.", jobId);
+leaderIdFuture = new CompletableFuture<>();
+} else {
+// there was an active leader, but we now have a new 
leader
+LOG.debug(
+"Job {} has a new job leader {}@{}.",
+jobId,
+leaderSessionId,
+leaderAddress);
+leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
+}
 } else {
-leaderIdFuture.complete(leaderSessionId);
+if (leaderSessionId != null) {
+// there was no active leader, but we now have a new 
leader
+LOG.debug(
+"Job {} has a new job leader {}@{}.",
+jobId,
+leaderSessionId,
+leaderAddress);
+leaderIdFuture.complete(leaderSessionId);
+}
 }
 
 if (previousJobLeaderId != null && 
!previousJobLeaderId.equals(leaderSessionId)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index f06d7a4..02032bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import