[2/2] flink git commit: [hotfix][docs] Fix broken redirect and liquid syntax problem
[hotfix][docs] Fix broken redirect and liquid syntax problem Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65b1da8c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65b1da8c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65b1da8c Branch: refs/heads/release-1.2 Commit: 65b1da8c532304142b763e9de5d00932ae438144 Parents: 0666786 Author: David AndersonAuthored: Mon Jan 30 15:29:02 2017 +0100 Committer: Aljoscha Krettek Committed: Mon Jan 30 17:59:25 2017 +0100 -- docs/redirects/state.md | 2 +- docs/setup/savepoints.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/65b1da8c/docs/redirects/state.md -- diff --git a/docs/redirects/state.md b/docs/redirects/state.md index 7a74677..15869ba 100644 --- a/docs/redirects/state.md +++ b/docs/redirects/state.md @@ -1,7 +1,7 @@ --- title: "Working with State" layout: redirect -redirect: /dev/state.html +redirect: /dev/stream/state.html permalink: /apis/streaming/state.html ---
[1/2] flink git commit: [FLINK-5456] Resurrect and update parts of the state intro documentation
Repository: flink Updated Branches: refs/heads/release-1.2 c365a34b8 -> 65b1da8c5 [FLINK-5456] Resurrect and update parts of the state intro documentation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0666786a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0666786a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0666786a Branch: refs/heads/release-1.2 Commit: 0666786ad87d3befd248cf7b8c59e686ac29af8e Parents: c365a34 Author: David AndersonAuthored: Wed Jan 18 15:56:18 2017 +0100 Committer: Aljoscha Krettek Committed: Mon Jan 30 17:59:14 2017 +0100 -- docs/dev/stream/state.md | 332 +++--- 1 file changed, 314 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0666786a/docs/dev/stream/state.md -- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index 0b38a62..124ce68 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -26,11 +26,11 @@ under the License. {:toc} Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. For example: +any type of more elaborate operation. For example: - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. - When aggregating events per minute, the state holds the pending aggregates. - - When training a machine learning model over a stream of data points, the state holds the current verstion of the model parameters. + - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it. In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk @@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large state. This document explains how to use Flink's state abstractions when developing an application. -## Keyed State and Operator state +## Keyed State and Operator State -There are two basic state backends: `Keyed State` and `Operator State`. +There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. - Keyed State +### Keyed State *Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. -Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as -well as the state of a keyed window operator. -Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed state can be redistributed and -there are as many key groups as the defined maximum parallelism. -During execution each parallel instance of an operator gets one or more key groups. +You can think of Keyed State as Operator State that has been partitioned, +or sharded, with exactly one state-partition per key. +Each keyed-state is logically bound to a unique +composite of , and since each key +"belongs" to exactly one parallel instance of a keyed operator, we can +think of this simply as . - Operator State +Keyed State is further organized into so-called *Key Groups*. Key Groups are the +atomic unit by which Flink can redistribute Keyed State; +there are exactly as many Key Groups as the defined maximum parallelism. +During execution each parallel instance of a keyed operator works with the keys +for one or more Key Groups. -*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface in Flink 1.0 and Flink 1.1. -The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State. +### Operator State -Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such -schemes; the following are currently defined: +With *Operator State* (or *non-keyed state*), each operator state is +bound to one parallel operator instance. +The Kafka source connector is a good motivating example for the use of Operator State +in Flink. Each parallel instance of this Kafka consumer maintains a map +of topic partitions and offsets as its Operator State. + +The Operator State interfaces support redistributing state among +parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the
[2/2] flink git commit: [FLINK-5456] Resurrect and update parts of the state intro documentation
[FLINK-5456] Resurrect and update parts of the state intro documentation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d27b9fee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d27b9fee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d27b9fee Branch: refs/heads/master Commit: d27b9fee5f21997505ad3434f46e5ff1f4e225ed Parents: ec3eb59 Author: David AndersonAuthored: Wed Jan 18 15:56:18 2017 +0100 Committer: Aljoscha Krettek Committed: Mon Jan 30 17:58:27 2017 +0100 -- docs/dev/stream/state.md | 332 +++--- 1 file changed, 314 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d27b9fee/docs/dev/stream/state.md -- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index 0b38a62..124ce68 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -26,11 +26,11 @@ under the License. {:toc} Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. For example: +any type of more elaborate operation. For example: - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. - When aggregating events per minute, the state holds the pending aggregates. - - When training a machine learning model over a stream of data points, the state holds the current verstion of the model parameters. + - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it. In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk @@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large state. This document explains how to use Flink's state abstractions when developing an application. -## Keyed State and Operator state +## Keyed State and Operator State -There are two basic state backends: `Keyed State` and `Operator State`. +There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. - Keyed State +### Keyed State *Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. -Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as -well as the state of a keyed window operator. -Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed state can be redistributed and -there are as many key groups as the defined maximum parallelism. -During execution each parallel instance of an operator gets one or more key groups. +You can think of Keyed State as Operator State that has been partitioned, +or sharded, with exactly one state-partition per key. +Each keyed-state is logically bound to a unique +composite of , and since each key +"belongs" to exactly one parallel instance of a keyed operator, we can +think of this simply as . - Operator State +Keyed State is further organized into so-called *Key Groups*. Key Groups are the +atomic unit by which Flink can redistribute Keyed State; +there are exactly as many Key Groups as the defined maximum parallelism. +During execution each parallel instance of a keyed operator works with the keys +for one or more Key Groups. -*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface in Flink 1.0 and Flink 1.1. -The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State. +### Operator State -Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such -schemes; the following are currently defined: +With *Operator State* (or *non-keyed state*), each operator state is +bound to one parallel operator instance. +The Kafka source connector is a good motivating example for the use of Operator State +in Flink. Each parallel instance of this Kafka consumer maintains a map +of topic partitions and offsets as its Operator State. + +The Operator State interfaces support redistributing state among +parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the following are currently defined: - **List-style redistribution:** Each operator
flink git commit: [FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a singleton
Repository: flink Updated Branches: refs/heads/release-1.2 3e2e49fd9 -> c365a34b8 [FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a singleton This closes #3230 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c365a34b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c365a34b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c365a34b Branch: refs/heads/release-1.2 Commit: c365a34b83e913d9c6c509627fae784435b056a2 Parents: 3e2e49f Author: Stefan RichterAuthored: Fri Jan 27 19:51:21 2017 +0100 Committer: Stephan Ewen Committed: Mon Jan 30 16:47:19 2017 +0100 -- .../core/fs/SafetyNetCloseableRegistry.java | 122 --- .../core/fs/SafetyNetCloseableRegistryTest.java | 66 +++--- 2 files changed, 128 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c365a34b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java index de4fb30..8b28fa2 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java @@ -18,20 +18,21 @@ package org.apache.flink.core.fs; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.AbstractCloseableRegistry; -import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.WrappingProxyUtil; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.io.IOException; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; import java.util.IdentityHashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; /** @@ -45,19 +46,35 @@ import java.util.Map; * * All methods in this class are thread-safe. */ +@Internal public class SafetyNetCloseableRegistry extends AbstractCloseableRegistry { private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class); - private final ReferenceQueue referenceQueue; - private final Thread reaperThread; + + /** Lock for accessing reaper thread and registry count */ + private static final Object REAPER_THREAD_LOCK = new Object(); + + /** Singleton reaper thread takes care of all registries in VM */ + @GuardedBy("REAPER_THREAD_LOCK") + private static CloseableReaperThread REAPER_THREAD = null; + + /** Global count of all instances of SafetyNetCloseableRegistry */ + @GuardedBy("REAPER_THREAD_LOCK") + private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0; public SafetyNetCloseableRegistry() { super(new IdentityHashMap ()); - this.referenceQueue = new ReferenceQueue<>(); - this.reaperThread = new CloseableReaperThread(); - reaperThread.start(); + + synchronized (REAPER_THREAD_LOCK) { + if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) { + Preconditions.checkState(null == REAPER_THREAD); + REAPER_THREAD = new CloseableReaperThread(); + REAPER_THREAD.start(); + } + ++GLOBAL_SAFETY_NET_REGISTRY_COUNT; + } } @Override @@ -65,14 +82,18 @@ public class SafetyNetCloseableRegistry extends WrappingProxyCloseable wrappingProxyCloseable, Map closeableMap) throws IOException { + assert Thread.holdsLock(getSynchronizationLock()); + Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate()); if (null == innerCloseable) { return; } - PhantomDelegatingCloseableRef phantomRef = - new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue); + PhantomDelegatingCloseableRef phantomRef = new
flink git commit: [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker
Repository: flink Updated Branches: refs/heads/release-1.2 f523deca8 -> 3e2e49fd9 [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker This closes #3215. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e2e49fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e2e49fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e2e49fd Branch: refs/heads/release-1.2 Commit: 3e2e49fd959bc185770b64fa89240f6a7ec80f02 Parents: f523dec Author: Ufuk CelebiAuthored: Wed Jan 25 15:42:24 2017 +0100 Committer: Ufuk Celebi Committed: Mon Jan 30 16:33:52 2017 +0100 -- .../checkpoints/CheckpointConfigHandler.java| 8 +- .../CheckpointStatsDetailsHandler.java | 7 +- .../CheckpointStatsDetailsSubtasksHandler.java | 7 +- .../checkpoints/CheckpointStatsHandler.java | 7 +- .../CheckpointConfigHandlerTest.java| 13 +-- .../CheckpointStatsDetailsHandlerTest.java | 13 +-- .../checkpoints/CheckpointStatsHandlerTest.java | 5 +- ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +-- .../checkpoint/AbstractCheckpointStats.java | 5 +- .../checkpoint/CheckpointStatsHistory.java | 4 +- .../checkpoint/CheckpointStatsTracker.java | 16 ++-- .../checkpoint/CompletedCheckpointStats.java| 26 +++--- .../checkpoint/FailedCheckpointStats.java | 24 +++--- .../checkpoint/PendingCheckpointStats.java | 4 +- .../checkpoint/RestoredCheckpointStats.java | 2 +- .../runtime/checkpoint/SubtaskStateStats.java | 8 +- .../runtime/checkpoint/TaskStateStats.java | 9 ++- .../executiongraph/AccessExecutionGraph.java| 20 +++-- .../executiongraph/ArchivedExecutionGraph.java | 54 - .../runtime/executiongraph/ExecutionGraph.java | 22 - .../tasks/ExternalizedCheckpointSettings.java | 2 + .../checkpoint/CheckpointStatsHistoryTest.java | 1 - .../checkpoint/CheckpointStatsSnapshotTest.java | 84 .../checkpoint/CompletedCheckpointTest.java | 35 .../checkpoint/FailedCheckpointStatsTest.java | 40 ++ .../checkpoint/PendingCheckpointStatsTest.java | 38 +++-- .../checkpoint/SubtaskStateStatsTest.java | 36 + .../runtime/checkpoint/TaskStateStatsTest.java | 46 ++- .../ArchivedExecutionGraphTest.java | 7 +- .../tasks/JobSnapshottingSettingsTest.java | 59 ++ 30 files changed, 490 insertions(+), 125 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index 1ad5e65..be0d283 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; @@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JobSnapshottingSettings settings = graph.getJobSnapshottingSettings(); - CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker(); - JobSnapshottingSettings settings = tracker.getSnapshottingSettings(); + if (settings == null) { + return "{}"; + } gen.writeStartObject(); { http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java -- diff --git
flink git commit: [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker
Repository: flink Updated Branches: refs/heads/master 126fb1779 -> dcfa3fbb0 [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker This closes #3215. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfa3fbb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfa3fbb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfa3fbb Branch: refs/heads/master Commit: dcfa3fbb0f17400ebf823e10f803cde8563fff4a Parents: 126fb17 Author: Ufuk CelebiAuthored: Wed Jan 25 15:42:24 2017 +0100 Committer: Ufuk Celebi Committed: Mon Jan 30 16:33:22 2017 +0100 -- .../checkpoints/CheckpointConfigHandler.java| 8 +- .../CheckpointStatsDetailsHandler.java | 7 +- .../CheckpointStatsDetailsSubtasksHandler.java | 7 +- .../checkpoints/CheckpointStatsHandler.java | 7 +- .../CheckpointConfigHandlerTest.java| 13 +-- .../CheckpointStatsDetailsHandlerTest.java | 13 +-- .../checkpoints/CheckpointStatsHandlerTest.java | 5 +- ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +-- .../checkpoint/AbstractCheckpointStats.java | 5 +- .../checkpoint/CheckpointStatsHistory.java | 4 +- .../checkpoint/CheckpointStatsTracker.java | 16 ++-- .../checkpoint/CompletedCheckpointStats.java| 26 +++--- .../checkpoint/FailedCheckpointStats.java | 24 +++--- .../checkpoint/PendingCheckpointStats.java | 4 +- .../checkpoint/RestoredCheckpointStats.java | 2 +- .../runtime/checkpoint/SubtaskStateStats.java | 8 +- .../runtime/checkpoint/TaskStateStats.java | 9 ++- .../executiongraph/AccessExecutionGraph.java| 20 +++-- .../executiongraph/ArchivedExecutionGraph.java | 54 - .../runtime/executiongraph/ExecutionGraph.java | 22 - .../tasks/ExternalizedCheckpointSettings.java | 2 + .../checkpoint/CheckpointStatsHistoryTest.java | 1 - .../checkpoint/CheckpointStatsSnapshotTest.java | 84 .../checkpoint/CompletedCheckpointTest.java | 35 .../checkpoint/FailedCheckpointStatsTest.java | 40 ++ .../checkpoint/PendingCheckpointStatsTest.java | 38 +++-- .../checkpoint/SubtaskStateStatsTest.java | 36 + .../runtime/checkpoint/TaskStateStatsTest.java | 46 ++- .../ArchivedExecutionGraphTest.java | 7 +- .../tasks/JobSnapshottingSettingsTest.java | 59 ++ 30 files changed, 490 insertions(+), 125 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index 1ad5e65..be0d283 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; @@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JobSnapshottingSettings settings = graph.getJobSnapshottingSettings(); - CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker(); - JobSnapshottingSettings settings = tracker.getSnapshottingSettings(); + if (settings == null) { + return "{}"; + } gen.writeStartObject(); { http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java -- diff --git
flink git commit: [FLINK-5678] [table] Fix User-defined Functions do not support all types of parameters
Repository: flink Updated Branches: refs/heads/release-1.2 617ff50c6 -> f523deca8 [FLINK-5678] [table] Fix User-defined Functions do not support all types of parameters This closes #3233. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f523deca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f523deca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f523deca Branch: refs/heads/release-1.2 Commit: f523deca8955257a6aeaafe4012a871307ea607a Parents: 617ff50 Author: Jark WuAuthored: Sun Jan 29 18:36:28 2017 +0600 Committer: twalthr Committed: Mon Jan 30 14:56:44 2017 +0100 -- .../codegen/calls/ScalarFunctionCallGen.scala | 12 +++ .../codegen/calls/TableFunctionCallGen.scala| 12 +++ .../flink/table/expressions/literals.scala | 5 +++ .../utils/UserDefinedFunctionUtils.scala| 12 +++ .../java/utils/UserDefinedScalarFunctions.java | 36 .../java/utils/UserDefinedTableFunctions.java | 31 + .../UserDefinedScalarFunctionTest.scala | 36 +++- .../dataset/DataSetCorrelateITCase.scala| 24 + 8 files changed, 161 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f523deca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala index ac840df..7ff18eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala @@ -18,11 +18,13 @@ package org.apache.flink.table.codegen.calls +import org.apache.commons.lang3.ClassUtils import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.table.typeutils.TypeCheckUtils /** * Generates a call to user-defined [[ScalarFunction]]. @@ -52,6 +54,16 @@ class ScalarFunctionCallGen( .map { case (paramClass, operandExpr) => if (paramClass.isPrimitive) { operandExpr + } else if (ClassUtils.isPrimitiveWrapper(paramClass) + && TypeCheckUtils.isTemporal(operandExpr.resultType)) { +// we use primitives to represent temporal types internally, so no casting needed here +val exprOrNull: String = if (codeGenerator.nullCheck) { + s"${operandExpr.nullTerm} ? null : " + +s"(${paramClass.getCanonicalName}) ${operandExpr.resultTerm}" +} else { + operandExpr.resultTerm +} +operandExpr.copy(resultTerm = exprOrNull) } else { val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType) val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr) http://git-wip-us.apache.org/repos/asf/flink/blob/f523deca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala index 6e44f55..890b6bd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.codegen.calls +import org.apache.commons.lang3.ClassUtils import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.table.typeutils.TypeCheckUtils /** *
flink git commit: [FLINK-5678] [table] Fix User-defined Functions do not support all types of parameters
Repository: flink Updated Branches: refs/heads/master 50b665677 -> 126fb1779 [FLINK-5678] [table] Fix User-defined Functions do not support all types of parameters This closes #3233. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/126fb177 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/126fb177 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/126fb177 Branch: refs/heads/master Commit: 126fb1779b84a934dd6bb632c96a7666ac8c521e Parents: 50b6656 Author: Jark WuAuthored: Sun Jan 29 18:36:28 2017 +0600 Committer: twalthr Committed: Mon Jan 30 14:51:57 2017 +0100 -- .../codegen/calls/ScalarFunctionCallGen.scala | 12 +++ .../codegen/calls/TableFunctionCallGen.scala| 12 +++ .../flink/table/expressions/literals.scala | 5 +++ .../utils/UserDefinedFunctionUtils.scala| 12 +++ .../java/utils/UserDefinedScalarFunctions.java | 36 .../java/utils/UserDefinedTableFunctions.java | 31 + .../UserDefinedScalarFunctionTest.scala | 36 +++- .../dataset/DataSetCorrelateITCase.scala| 26 +- 8 files changed, 162 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/126fb177/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala index ac840df..7ff18eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala @@ -18,11 +18,13 @@ package org.apache.flink.table.codegen.calls +import org.apache.commons.lang3.ClassUtils import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.table.typeutils.TypeCheckUtils /** * Generates a call to user-defined [[ScalarFunction]]. @@ -52,6 +54,16 @@ class ScalarFunctionCallGen( .map { case (paramClass, operandExpr) => if (paramClass.isPrimitive) { operandExpr + } else if (ClassUtils.isPrimitiveWrapper(paramClass) + && TypeCheckUtils.isTemporal(operandExpr.resultType)) { +// we use primitives to represent temporal types internally, so no casting needed here +val exprOrNull: String = if (codeGenerator.nullCheck) { + s"${operandExpr.nullTerm} ? null : " + +s"(${paramClass.getCanonicalName}) ${operandExpr.resultTerm}" +} else { + operandExpr.resultTerm +} +operandExpr.copy(resultTerm = exprOrNull) } else { val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType) val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr) http://git-wip-us.apache.org/repos/asf/flink/blob/126fb177/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala index 6e44f55..890b6bd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.codegen.calls +import org.apache.commons.lang3.ClassUtils import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.table.typeutils.TypeCheckUtils /** * Generates a