buildbot success in on flink-docs-master
The Buildbot has detected a restored build on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/426 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
flink git commit: [FLINK-4394] RMQSource: QueueName accessible for subclasses
Repository: flink Updated Branches: refs/heads/master 90fdae452 -> 5ccd90715 [FLINK-4394] RMQSource: QueueName accessible for subclasses The queueName is needed if the subclasses override `setupQueue`. This closes #2373 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ccd9071 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ccd9071 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ccd9071 Branch: refs/heads/master Commit: 5ccd9071580e196d150905b2d05eef71e399a24c Parents: 90fdae4 Author: Dominik Authored: Mon Aug 15 09:41:57 2016 +0200 Committer: Robert Metzger Committed: Mon Aug 15 16:19:49 2016 +0200 -- .../org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5ccd9071/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java -- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 33cf52c..ee9c3b9 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -74,7 +74,7 @@ public class RMQSource extends MultipleIdsMessageAcknowledgingSourceBase schema;
flink git commit: [FLINK-3097] [table] Add support for custom functions in Table API
Repository: flink Updated Branches: refs/heads/master 83c4b9707 -> 90fdae452 [FLINK-3097] [table] Add support for custom functions in Table API This closes #2265. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90fdae45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90fdae45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90fdae45 Branch: refs/heads/master Commit: 90fdae452b6a03fafd4ec7827030d78ae87dbcd3 Parents: 83c4b97 Author: twalthr Authored: Tue Jul 12 12:22:41 2016 +0200 Committer: twalthr Committed: Mon Aug 15 16:16:27 2016 +0200 -- docs/apis/table.md | 89 .../flink/api/table/FlinkPlannerImpl.scala | 4 +- .../flink/api/table/TableEnvironment.scala | 29 ++- .../flink/api/table/codegen/CodeGenerator.scala | 189 ++- .../codegen/calls/ScalarFunctionCallGen.scala | 95 .../table/codegen/calls/ScalarFunctions.scala | 49 ++-- .../org/apache/flink/api/table/exceptions.scala | 13 +- .../flink/api/table/expressions/call.scala | 52 - .../api/table/functions/ScalarFunction.scala| 143 .../table/functions/UserDefinedFunction.scala | 61 + .../functions/utils/ScalarSqlFunction.scala | 180 +++ .../utils/UserDefinedFunctionUtils.scala| 185 +++ .../api/table/plan/RexNodeTranslator.scala | 7 +- .../api/table/validate/FunctionCatalog.scala| 71 -- .../api/java/batch/table/ExpressionsITCase.java | 33 +++ .../api/scala/batch/sql/ExpressionsITCase.scala | 33 ++- .../flink/api/scala/batch/sql/JoinITCase.scala | 3 +- .../api/scala/batch/sql/SelectITCase.scala | 3 +- .../scala/batch/sql/SetOperatorsITCase.scala| 2 + .../UserDefinedScalarFunctionTest.scala | 227 +++ .../expressions/utils/ExpressionTestBase.scala | 10 +- .../utils/UserDefinedScalarFunctions.scala | 121 ++ 22 files changed, 1491 insertions(+), 108 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/90fdae45/docs/apis/table.md -- diff --git a/docs/apis/table.md b/docs/apis/table.md index ea8e343..c2806c6 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -1897,6 +1897,95 @@ TIMESTAMP VARCHAR +### User-defined Scalar Functions + +If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value. + +In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.api.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`. + +The following example snippet shows how to define your own hash code function: + + + +{% highlight java %} +public static class HashCode extends ScalarFunction { + public int eval(String s) { +return s.hashCode() * 12; + } +} + +BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// register the function +tableEnv.registerFunction("hashCode", new HashCode()) + +// use the function in Java Table API +myTable.select("string, string.hashCode(), hashCode(string)"); + +// use the function in SQL API +tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable"); +{% endhighlight %} + + + +{% highlight scala %} +// must be defined in static/object context +object hashCode extends ScalarFunction { + def eval(s: String): Int = { +s.hashCode() * 12 + } +} + +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// use the function in Scala Table API +myTable.select('string, hashCode('string)) + +// register and use the function in SQL +tableEnv.registerFunction("hashCode", hashCode) +tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable"); +{% endhighlight %} + + + +By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`. + +Internally, the Table API and SQL code generation works with primitive values as much as possible. If a user-defined scal
flink git commit: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocation messages
Repository: flink Updated Branches: refs/heads/flip-6 363f07cc4 -> e8a434647 [FLINK-4383] [rpc] Eagerly serialize remote rpc invocation messages This PR introduces an eager serialization for remote rpc invocation messages. That way it is possible to check whether the message is serializable and whether it exceeds the maximum allowed akka frame size. If either of these constraints is violated, a proper exception is thrown instead of simply swallowing the exception as Akka does it. Address PR comments This closes #2365. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8a43464 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8a43464 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8a43464 Branch: refs/heads/flip-6 Commit: e8a434647bac8009b11779a3943c5d63871fcb14 Parents: 363f07c Author: Till Rohrmann Authored: Fri Aug 12 10:32:30 2016 +0200 Committer: Till Rohrmann Committed: Mon Aug 15 16:07:28 2016 +0200 -- .../flink/runtime/rpc/akka/AkkaGateway.java | 2 +- .../runtime/rpc/akka/AkkaInvocationHandler.java | 83 ++-- .../flink/runtime/rpc/akka/AkkaRpcActor.java| 26 ++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 20 +- .../rpc/akka/messages/LocalRpcInvocation.java | 54 + .../rpc/akka/messages/RemoteRpcInvocation.java | 206 ++ .../rpc/akka/messages/RpcInvocation.java| 106 +++--- .../runtime/rpc/akka/AkkaRpcServiceTest.java| 2 +- .../rpc/akka/MessageSerializationTest.java | 210 +++ 9 files changed, 597 insertions(+), 112 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e8a43464/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java index ec3091c..f6125dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java @@ -26,5 +26,5 @@ import org.apache.flink.runtime.rpc.RpcGateway; */ interface AkkaGateway extends RpcGateway { - ActorRef getRpcServer(); + ActorRef getRpcEndpoint(); } http://git-wip-us.apache.org/repos/asf/flink/blob/e8a43464/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 580b161..297104b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -25,13 +25,17 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.util.Preconditions; +import org.apache.log4j.Logger; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -42,19 +46,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument; /** - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor { - private final ActorRef rpcServer; + private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); + + private final ActorRef rpcEndpoint; + + // whether the actor ref is local and thus no message serialization is needed +
flink git commit: [FLINK-4385] [table] Union on Timestamp fields does not work
Repository: flink Updated Branches: refs/heads/master 79cc30f3d -> 83c4b9707 [FLINK-4385] [table] Union on Timestamp fields does not work This closes #2362. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83c4b970 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83c4b970 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83c4b970 Branch: refs/heads/master Commit: 83c4b9707dd24425391bd5759f12878ad2f19175 Parents: 79cc30f Author: Jark Wu Authored: Fri Aug 12 17:48:23 2016 +0800 Committer: twalthr Committed: Mon Aug 15 09:27:14 2016 +0200 -- .../apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala | 5 - .../org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/83c4b970/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index 08e0c41..9ce1580 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -62,7 +62,10 @@ trait DataSetRel extends RelNode with FlinkRel { case SqlTypeName.VARCHAR => s + 12 case SqlTypeName.CHAR => s + 1 case SqlTypeName.DECIMAL => s + 12 -case _ => throw new TableException("Unsupported data type encountered") +case SqlTypeName.INTERVAL_DAY_TIME => s + 8 +case SqlTypeName.INTERVAL_YEAR_MONTH => s + 4 +case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 +case _ => throw TableException(s"Unsupported data type encountered: $t") } } http://git-wip-us.apache.org/repos/asf/flink/blob/83c4b970/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala index 85524fb..bf5cb58 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala @@ -97,7 +97,7 @@ object IntervalTypeInfo { ascendingOrder: java.lang.Boolean) : TypeComparator[X] = { try { - val constructor = comparatorClass.getConstructor(classOf[java.lang.Boolean]) + val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE) constructor.newInstance(ascendingOrder) } catch { case e: Exception =>
flink git commit: [FLINK-4387] [runtime] Ignore test in KvStateClientTest
Repository: flink Updated Branches: refs/heads/master 726d7c7ff -> 79cc30f3d [FLINK-4387] [runtime] Ignore test in KvStateClientTest Instable test on TravisCI. Most likely caused by a Netty issue (#4357). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79cc30f3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79cc30f3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79cc30f3 Branch: refs/heads/master Commit: 79cc30f3d5ad84d860236cdd45711f67d9f9278a Parents: 726d7c7 Author: Ufuk Celebi Authored: Mon Aug 15 09:00:40 2016 +0200 Committer: Ufuk Celebi Committed: Mon Aug 15 09:00:46 2016 +0200 -- .../org/apache/flink/runtime/query/netty/KvStateClientTest.java| 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/79cc30f3/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index ac03f94..4c42318 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemValueState; import org.apache.flink.util.NetUtils; import org.junit.AfterClass; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -515,6 +516,7 @@ public class KvStateClientTest { * that all ongoing requests are failed. */ @Test + @Ignore public void testClientServerIntegration() throws Exception { // Config final int numServers = 2;