[2/2] flink git commit: [hotfix][docs] Fix broken redirect and liquid syntax problem

2017-01-30 Thread aljoscha
[hotfix][docs] Fix broken redirect and liquid syntax problem


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65b1da8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65b1da8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65b1da8c

Branch: refs/heads/release-1.2
Commit: 65b1da8c532304142b763e9de5d00932ae438144
Parents: 0666786
Author: David Anderson 
Authored: Mon Jan 30 15:29:02 2017 +0100
Committer: Aljoscha Krettek 
Committed: Mon Jan 30 17:59:25 2017 +0100

--
 docs/redirects/state.md  | 2 +-
 docs/setup/savepoints.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/65b1da8c/docs/redirects/state.md
--
diff --git a/docs/redirects/state.md b/docs/redirects/state.md
index 7a74677..15869ba 100644
--- a/docs/redirects/state.md
+++ b/docs/redirects/state.md
@@ -1,7 +1,7 @@
 ---
 title: "Working with State"
 layout: redirect
-redirect: /dev/state.html
+redirect: /dev/stream/state.html
 permalink: /apis/streaming/state.html
 ---
 

[1/2] flink git commit: [FLINK-5456] Resurrect and update parts of the state intro documentation

2017-01-30 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.2 c365a34b8 -> 65b1da8c5


[FLINK-5456] Resurrect and update parts of the state intro documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0666786a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0666786a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0666786a

Branch: refs/heads/release-1.2
Commit: 0666786ad87d3befd248cf7b8c59e686ac29af8e
Parents: c365a34
Author: David Anderson 
Authored: Wed Jan 18 15:56:18 2017 +0100
Committer: Aljoscha Krettek 
Committed: Mon Jan 30 17:59:14 2017 +0100

--
 docs/dev/stream/state.md | 332 +++---
 1 file changed, 314 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0666786a/docs/dev/stream/state.md
--
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0b38a62..124ce68 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -26,11 +26,11 @@ under the License.
 {:toc}
 
 Stateful functions and operators store data across the processing of 
individual elements/events, making state a critical building block for
-any type of more elaborate operation. For example: 
+any type of more elaborate operation. For example:
 
   - When an application searches for certain event patterns, the state will 
store the sequence of events encountered so far.
   - When aggregating events per minute, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the 
state holds the current verstion of the model parameters.
+  - When training a machine learning model over a stream of data points, the 
state holds the current version of the model parameters.
 
 In order to make state fault tolerant, Flink needs to be aware of the state 
and [checkpoint](checkpointing.html) it.
 In many cases, Flink can also *manage* the state for the application, meaning 
Flink deals with the memory management (possibly spilling to disk
@@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large 
state.
 This document explains how to use Flink's state abstractions when developing 
an application.
 
 
-## Keyed State and Operator state
+## Keyed State and Operator State
 
-There are two basic state backends: `Keyed State` and `Operator State`.
+There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
 
- Keyed State
+### Keyed State
 
 *Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
-Examples of keyed state are the `ValueState` or `ListState` that one can 
create in a function on a `KeyedStream`, as
-well as the state of a keyed window operator.
 
-Keyed State is organized in so called *Key Groups*. Key Groups are the unit by 
which keyed state can be redistributed and
-there are as many key groups as the defined maximum parallelism.
-During execution each parallel instance of an operator gets one or more key 
groups.
+You can think of Keyed State as Operator State that has been partitioned,
+or sharded, with exactly one state-partition per key.
+Each keyed-state is logically bound to a unique
+composite of , and since each key
+"belongs" to exactly one parallel instance of a keyed operator, we can
+think of this simply as .
 
- Operator State
+Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
+atomic unit by which Flink can redistribute Keyed State;
+there are exactly as many Key Groups as the defined maximum parallelism.
+During execution each parallel instance of a keyed operator works with the keys
+for one or more Key Groups.
 
-*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` 
interface in Flink 1.0 and Flink 1.1.
-The new `CheckpointedFunction` interface is basically a shortcut (syntactic 
sugar) for the Operator State.
+### Operator State
 
-Operator State needs special re-distribution schemes when parallelism is 
changed. There can be different variations of such
-schemes; the following are currently defined:
+With *Operator State* (or *non-keyed state*), each operator state is
+bound to one parallel operator instance.
+The Kafka source connector is a good motivating example for the use of 
Operator State
+in Flink. Each parallel instance of this Kafka consumer maintains a map
+of topic partitions and offsets as its Operator State.
+
+The Operator State interfaces support redistributing state among
+parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution; the 

[2/2] flink git commit: [FLINK-5456] Resurrect and update parts of the state intro documentation

2017-01-30 Thread aljoscha
[FLINK-5456] Resurrect and update parts of the state intro documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d27b9fee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d27b9fee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d27b9fee

Branch: refs/heads/master
Commit: d27b9fee5f21997505ad3434f46e5ff1f4e225ed
Parents: ec3eb59
Author: David Anderson 
Authored: Wed Jan 18 15:56:18 2017 +0100
Committer: Aljoscha Krettek 
Committed: Mon Jan 30 17:58:27 2017 +0100

--
 docs/dev/stream/state.md | 332 +++---
 1 file changed, 314 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d27b9fee/docs/dev/stream/state.md
--
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0b38a62..124ce68 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -26,11 +26,11 @@ under the License.
 {:toc}
 
 Stateful functions and operators store data across the processing of 
individual elements/events, making state a critical building block for
-any type of more elaborate operation. For example: 
+any type of more elaborate operation. For example:
 
   - When an application searches for certain event patterns, the state will 
store the sequence of events encountered so far.
   - When aggregating events per minute, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the 
state holds the current verstion of the model parameters.
+  - When training a machine learning model over a stream of data points, the 
state holds the current version of the model parameters.
 
 In order to make state fault tolerant, Flink needs to be aware of the state 
and [checkpoint](checkpointing.html) it.
 In many cases, Flink can also *manage* the state for the application, meaning 
Flink deals with the memory management (possibly spilling to disk
@@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large 
state.
 This document explains how to use Flink's state abstractions when developing 
an application.
 
 
-## Keyed State and Operator state
+## Keyed State and Operator State
 
-There are two basic state backends: `Keyed State` and `Operator State`.
+There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
 
- Keyed State
+### Keyed State
 
 *Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
-Examples of keyed state are the `ValueState` or `ListState` that one can 
create in a function on a `KeyedStream`, as
-well as the state of a keyed window operator.
 
-Keyed State is organized in so called *Key Groups*. Key Groups are the unit by 
which keyed state can be redistributed and
-there are as many key groups as the defined maximum parallelism.
-During execution each parallel instance of an operator gets one or more key 
groups.
+You can think of Keyed State as Operator State that has been partitioned,
+or sharded, with exactly one state-partition per key.
+Each keyed-state is logically bound to a unique
+composite of , and since each key
+"belongs" to exactly one parallel instance of a keyed operator, we can
+think of this simply as .
 
- Operator State
+Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
+atomic unit by which Flink can redistribute Keyed State;
+there are exactly as many Key Groups as the defined maximum parallelism.
+During execution each parallel instance of a keyed operator works with the keys
+for one or more Key Groups.
 
-*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` 
interface in Flink 1.0 and Flink 1.1.
-The new `CheckpointedFunction` interface is basically a shortcut (syntactic 
sugar) for the Operator State.
+### Operator State
 
-Operator State needs special re-distribution schemes when parallelism is 
changed. There can be different variations of such
-schemes; the following are currently defined:
+With *Operator State* (or *non-keyed state*), each operator state is
+bound to one parallel operator instance.
+The Kafka source connector is a good motivating example for the use of 
Operator State
+in Flink. Each parallel instance of this Kafka consumer maintains a map
+of topic partitions and offsets as its Operator State.
+
+The Operator State interfaces support redistributing state among
+parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution; the following are currently 
defined:
 
   - **List-style redistribution:** Each operator 

flink git commit: [FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a singleton

2017-01-30 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.2 3e2e49fd9 -> c365a34b8


[FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a 
singleton

This closes #3230


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c365a34b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c365a34b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c365a34b

Branch: refs/heads/release-1.2
Commit: c365a34b83e913d9c6c509627fae784435b056a2
Parents: 3e2e49f
Author: Stefan Richter 
Authored: Fri Jan 27 19:51:21 2017 +0100
Committer: Stephan Ewen 
Committed: Mon Jan 30 16:47:19 2017 +0100

--
 .../core/fs/SafetyNetCloseableRegistry.java | 122 ---
 .../core/fs/SafetyNetCloseableRegistryTest.java |  66 +++---
 2 files changed, 128 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c365a34b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index de4fb30..8b28fa2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.AbstractCloseableRegistry;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingProxyUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
 import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -45,19 +46,35 @@ import java.util.Map;
  * 
  * All methods in this class are thread-safe.
  */
+@Internal
 public class SafetyNetCloseableRegistry extends
AbstractCloseableRegistry {
 
private static final Logger LOG = 
LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
-   private final ReferenceQueue referenceQueue;
-   private final Thread reaperThread;
+
+   /** Lock for accessing reaper thread and registry count */
+   private static final Object REAPER_THREAD_LOCK = new Object();
+
+   /** Singleton reaper thread takes care of all registries in VM */
+   @GuardedBy("REAPER_THREAD_LOCK")
+   private static CloseableReaperThread REAPER_THREAD = null;
+
+   /** Global count of all instances of SafetyNetCloseableRegistry */
+   @GuardedBy("REAPER_THREAD_LOCK")
+   private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
 
public SafetyNetCloseableRegistry() {
super(new IdentityHashMap());
-   this.referenceQueue = new ReferenceQueue<>();
-   this.reaperThread = new CloseableReaperThread();
-   reaperThread.start();
+
+   synchronized (REAPER_THREAD_LOCK) {
+   if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
+   Preconditions.checkState(null == REAPER_THREAD);
+   REAPER_THREAD = new CloseableReaperThread();
+   REAPER_THREAD.start();
+   }
+   ++GLOBAL_SAFETY_NET_REGISTRY_COUNT;
+   }
}
 
@Override
@@ -65,14 +82,18 @@ public class SafetyNetCloseableRegistry extends
WrappingProxyCloseable 
wrappingProxyCloseable,
Map 
closeableMap) throws IOException {
 
+   assert Thread.holdsLock(getSynchronizationLock());
+
Closeable innerCloseable = 
WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
 
if (null == innerCloseable) {
return;
}
 
-   PhantomDelegatingCloseableRef phantomRef =
-   new 
PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+   PhantomDelegatingCloseableRef phantomRef = new 

flink git commit: [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker

2017-01-30 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 f523deca8 -> 3e2e49fd9


[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker

This closes #3215.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e2e49fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e2e49fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e2e49fd

Branch: refs/heads/release-1.2
Commit: 3e2e49fd959bc185770b64fa89240f6a7ec80f02
Parents: f523dec
Author: Ufuk Celebi 
Authored: Wed Jan 25 15:42:24 2017 +0100
Committer: Ufuk Celebi 
Committed: Mon Jan 30 16:33:52 2017 +0100

--
 .../checkpoints/CheckpointConfigHandler.java|  8 +-
 .../CheckpointStatsDetailsHandler.java  |  7 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |  7 +-
 .../checkpoints/CheckpointStatsHandler.java |  7 +-
 .../CheckpointConfigHandlerTest.java| 13 +--
 .../CheckpointStatsDetailsHandlerTest.java  | 13 +--
 .../checkpoints/CheckpointStatsHandlerTest.java |  5 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +--
 .../checkpoint/AbstractCheckpointStats.java |  5 +-
 .../checkpoint/CheckpointStatsHistory.java  |  4 +-
 .../checkpoint/CheckpointStatsTracker.java  | 16 ++--
 .../checkpoint/CompletedCheckpointStats.java| 26 +++---
 .../checkpoint/FailedCheckpointStats.java   | 24 +++---
 .../checkpoint/PendingCheckpointStats.java  |  4 +-
 .../checkpoint/RestoredCheckpointStats.java |  2 +-
 .../runtime/checkpoint/SubtaskStateStats.java   |  8 +-
 .../runtime/checkpoint/TaskStateStats.java  |  9 ++-
 .../executiongraph/AccessExecutionGraph.java| 20 +++--
 .../executiongraph/ArchivedExecutionGraph.java  | 54 -
 .../runtime/executiongraph/ExecutionGraph.java  | 22 -
 .../tasks/ExternalizedCheckpointSettings.java   |  2 +
 .../checkpoint/CheckpointStatsHistoryTest.java  |  1 -
 .../checkpoint/CheckpointStatsSnapshotTest.java | 84 
 .../checkpoint/CompletedCheckpointTest.java | 35 
 .../checkpoint/FailedCheckpointStatsTest.java   | 40 ++
 .../checkpoint/PendingCheckpointStatsTest.java  | 38 +++--
 .../checkpoint/SubtaskStateStatsTest.java   | 36 +
 .../runtime/checkpoint/TaskStateStatsTest.java  | 46 ++-
 .../ArchivedExecutionGraphTest.java |  7 +-
 .../tasks/JobSnapshottingSettingsTest.java  | 59 ++
 30 files changed, 490 insertions(+), 125 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 1ad5e65..be0d283 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
@Override
public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception {
StringWriter writer = new StringWriter();
+
JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+   JobSnapshottingSettings settings = 
graph.getJobSnapshottingSettings();
 
-   CheckpointStatsTracker tracker = 
graph.getCheckpointStatsTracker();
-   JobSnapshottingSettings settings = 
tracker.getSnapshottingSettings();
+   if (settings == null) {
+   return "{}";
+   }
 
gen.writeStartObject();
{

http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
--
diff --git 

flink git commit: [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker

2017-01-30 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 126fb1779 -> dcfa3fbb0


[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker

This closes #3215.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfa3fbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfa3fbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfa3fbb

Branch: refs/heads/master
Commit: dcfa3fbb0f17400ebf823e10f803cde8563fff4a
Parents: 126fb17
Author: Ufuk Celebi 
Authored: Wed Jan 25 15:42:24 2017 +0100
Committer: Ufuk Celebi 
Committed: Mon Jan 30 16:33:22 2017 +0100

--
 .../checkpoints/CheckpointConfigHandler.java|  8 +-
 .../CheckpointStatsDetailsHandler.java  |  7 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |  7 +-
 .../checkpoints/CheckpointStatsHandler.java |  7 +-
 .../CheckpointConfigHandlerTest.java| 13 +--
 .../CheckpointStatsDetailsHandlerTest.java  | 13 +--
 .../checkpoints/CheckpointStatsHandlerTest.java |  5 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +--
 .../checkpoint/AbstractCheckpointStats.java |  5 +-
 .../checkpoint/CheckpointStatsHistory.java  |  4 +-
 .../checkpoint/CheckpointStatsTracker.java  | 16 ++--
 .../checkpoint/CompletedCheckpointStats.java| 26 +++---
 .../checkpoint/FailedCheckpointStats.java   | 24 +++---
 .../checkpoint/PendingCheckpointStats.java  |  4 +-
 .../checkpoint/RestoredCheckpointStats.java |  2 +-
 .../runtime/checkpoint/SubtaskStateStats.java   |  8 +-
 .../runtime/checkpoint/TaskStateStats.java  |  9 ++-
 .../executiongraph/AccessExecutionGraph.java| 20 +++--
 .../executiongraph/ArchivedExecutionGraph.java  | 54 -
 .../runtime/executiongraph/ExecutionGraph.java  | 22 -
 .../tasks/ExternalizedCheckpointSettings.java   |  2 +
 .../checkpoint/CheckpointStatsHistoryTest.java  |  1 -
 .../checkpoint/CheckpointStatsSnapshotTest.java | 84 
 .../checkpoint/CompletedCheckpointTest.java | 35 
 .../checkpoint/FailedCheckpointStatsTest.java   | 40 ++
 .../checkpoint/PendingCheckpointStatsTest.java  | 38 +++--
 .../checkpoint/SubtaskStateStatsTest.java   | 36 +
 .../runtime/checkpoint/TaskStateStatsTest.java  | 46 ++-
 .../ArchivedExecutionGraphTest.java |  7 +-
 .../tasks/JobSnapshottingSettingsTest.java  | 59 ++
 30 files changed, 490 insertions(+), 125 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 1ad5e65..be0d283 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
@Override
public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception {
StringWriter writer = new StringWriter();
+
JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+   JobSnapshottingSettings settings = 
graph.getJobSnapshottingSettings();
 
-   CheckpointStatsTracker tracker = 
graph.getCheckpointStatsTracker();
-   JobSnapshottingSettings settings = 
tracker.getSnapshottingSettings();
+   if (settings == null) {
+   return "{}";
+   }
 
gen.writeStartObject();
{

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
--
diff --git 

flink git commit: [FLINK-5678] [table] Fix User-defined Functions do not support all types of parameters

2017-01-30 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.2 617ff50c6 -> f523deca8


[FLINK-5678] [table] Fix User-defined Functions do not support all types of 
parameters

This closes #3233.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f523deca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f523deca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f523deca

Branch: refs/heads/release-1.2
Commit: f523deca8955257a6aeaafe4012a871307ea607a
Parents: 617ff50
Author: Jark Wu 
Authored: Sun Jan 29 18:36:28 2017 +0600
Committer: twalthr 
Committed: Mon Jan 30 14:56:44 2017 +0100

--
 .../codegen/calls/ScalarFunctionCallGen.scala   | 12 +++
 .../codegen/calls/TableFunctionCallGen.scala| 12 +++
 .../flink/table/expressions/literals.scala  |  5 +++
 .../utils/UserDefinedFunctionUtils.scala| 12 +++
 .../java/utils/UserDefinedScalarFunctions.java  | 36 
 .../java/utils/UserDefinedTableFunctions.java   | 31 +
 .../UserDefinedScalarFunctionTest.scala | 36 +++-
 .../dataset/DataSetCorrelateITCase.scala| 24 +
 8 files changed, 161 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f523deca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index ac840df..7ff18eb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.codegen.calls
 
+import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 /**
   * Generates a call to user-defined [[ScalarFunction]].
@@ -52,6 +54,16 @@ class ScalarFunctionCallGen(
 .map { case (paramClass, operandExpr) =>
   if (paramClass.isPrimitive) {
 operandExpr
+  } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+  && TypeCheckUtils.isTemporal(operandExpr.resultType)) {
+// we use primitives to represent temporal types internally, so no 
casting needed here
+val exprOrNull: String = if (codeGenerator.nullCheck) {
+  s"${operandExpr.nullTerm} ? null : " +
+s"(${paramClass.getCanonicalName}) ${operandExpr.resultTerm}"
+} else {
+  operandExpr.resultTerm
+}
+operandExpr.copy(resultTerm = exprOrNull)
   } else {
 val boxedTypeTerm = 
boxedTypeTermForTypeInfo(operandExpr.resultType)
 val boxedExpr = 
codeGenerator.generateOutputFieldBoxing(operandExpr)

http://git-wip-us.apache.org/repos/asf/flink/blob/f523deca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index 6e44f55..890b6bd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.codegen.calls
 
+import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 /**
   * 

flink git commit: [FLINK-5678] [table] Fix User-defined Functions do not support all types of parameters

2017-01-30 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 50b665677 -> 126fb1779


[FLINK-5678] [table] Fix User-defined Functions do not support all types of 
parameters

This closes #3233.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/126fb177
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/126fb177
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/126fb177

Branch: refs/heads/master
Commit: 126fb1779b84a934dd6bb632c96a7666ac8c521e
Parents: 50b6656
Author: Jark Wu 
Authored: Sun Jan 29 18:36:28 2017 +0600
Committer: twalthr 
Committed: Mon Jan 30 14:51:57 2017 +0100

--
 .../codegen/calls/ScalarFunctionCallGen.scala   | 12 +++
 .../codegen/calls/TableFunctionCallGen.scala| 12 +++
 .../flink/table/expressions/literals.scala  |  5 +++
 .../utils/UserDefinedFunctionUtils.scala| 12 +++
 .../java/utils/UserDefinedScalarFunctions.java  | 36 
 .../java/utils/UserDefinedTableFunctions.java   | 31 +
 .../UserDefinedScalarFunctionTest.scala | 36 +++-
 .../dataset/DataSetCorrelateITCase.scala| 26 +-
 8 files changed, 162 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/126fb177/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index ac840df..7ff18eb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.codegen.calls
 
+import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 /**
   * Generates a call to user-defined [[ScalarFunction]].
@@ -52,6 +54,16 @@ class ScalarFunctionCallGen(
 .map { case (paramClass, operandExpr) =>
   if (paramClass.isPrimitive) {
 operandExpr
+  } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+  && TypeCheckUtils.isTemporal(operandExpr.resultType)) {
+// we use primitives to represent temporal types internally, so no 
casting needed here
+val exprOrNull: String = if (codeGenerator.nullCheck) {
+  s"${operandExpr.nullTerm} ? null : " +
+s"(${paramClass.getCanonicalName}) ${operandExpr.resultTerm}"
+} else {
+  operandExpr.resultTerm
+}
+operandExpr.copy(resultTerm = exprOrNull)
   } else {
 val boxedTypeTerm = 
boxedTypeTermForTypeInfo(operandExpr.resultType)
 val boxedExpr = 
codeGenerator.generateOutputFieldBoxing(operandExpr)

http://git-wip-us.apache.org/repos/asf/flink/blob/126fb177/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index 6e44f55..890b6bd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.codegen.calls
 
+import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 /**
   * Generates a