flink git commit: [hotfix] Replace HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE with HA_CLUSTER_ID
Repository: flink Updated Branches: refs/heads/master c4acbb838 -> b5db8d908 [hotfix] Replace HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE with HA_CLUSTER_ID Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5db8d90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5db8d90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5db8d90 Branch: refs/heads/master Commit: b5db8d90818efb96ac407ccc213f2892f3852321 Parents: c4acbb8 Author: Till RohrmannAuthored: Fri Dec 29 16:06:59 2017 +0100 Committer: Till Rohrmann Committed: Fri Dec 29 16:06:59 2017 +0100 -- .../java/org/apache/flink/configuration/ConfigConstants.java | 4 ++-- .../apache/flink/configuration/HighAvailabilityOptions.java| 5 - .../java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java| 6 +++--- 3 files changed, 5 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b5db8d90/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java -- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 5fd7085..50039ac 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1031,7 +1031,7 @@ public final class ConfigConstants { @Deprecated public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */ + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace"; @@ -1788,7 +1788,7 @@ public final class ConfigConstants { @Deprecated public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */ + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default"; http://git-wip-us.apache.org/repos/asf/flink/blob/b5db8d90/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java -- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 2b026b9..6ee9f94 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -100,11 +100,6 @@ public class HighAvailabilityOptions { .defaultValue("/flink") .withDeprecatedKeys("recovery.zookeeper.path.root"); - public static final ConfigOption HA_ZOOKEEPER_NAMESPACE = - key("high-availability.zookeeper.path.namespace") - .noDefaultValue() - .withDeprecatedKeys("recovery.zookeeper.path.namespace"); - public static final ConfigOption HA_ZOOKEEPER_LATCH_PATH = key("high-availability.zookeeper.path.latch") .defaultValue("/leaderlatch") http://git-wip-us.apache.org/repos/asf/flink/blob/b5db8d90/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index c903a76..df4ef1f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -73,7 +73,7 @@ import java.util.Properties; import java.util.concurrent.Callable; import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; -import static org.apache.flink.configuration.HighAvailabilityOptions.HA_ZOOKEEPER_NAMESPACE; +import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID; /** * Class handling the command line interface to the YARN session. @@ -644,9 +644,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine
flink git commit: [FLINK-8227] Optimize the performance of SharedBufferSerializer
Repository: flink Updated Branches: refs/heads/master 91f00ec91 -> c4acbb838 [FLINK-8227] Optimize the performance of SharedBufferSerializer This closes #5142 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4acbb83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4acbb83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4acbb83 Branch: refs/heads/master Commit: c4acbb838ffc9582436f976bfb7eaff838a0ed87 Parents: 91f00ec Author: Dian FuAuthored: Sat Dec 9 11:51:04 2017 +0800 Committer: Dawid Wysakowicz Committed: Fri Dec 29 13:52:37 2017 +0100 -- .../java/org/apache/flink/cep/nfa/SharedBuffer.java | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c4acbb83/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 0cf47ca..29e8dc2 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -527,6 +527,7 @@ public class SharedBuffer implements Serializable { private final Set > edges; private final SharedBufferPage page; private int referenceCounter; + private transient int entryId; SharedBufferEntry( final ValueTimeWrapper valueTime, @@ -547,6 +548,8 @@ public class SharedBuffer implements Serializable { referenceCounter = 0; + entryId = -1; + this.page = page; } @@ -886,7 +889,6 @@ public class SharedBuffer implements Serializable { @Override public void serialize(SharedBuffer record, DataOutputView target) throws IOException { Map > pages = record.pages; - Map , Integer> entryIDs = new HashMap<>(); int totalEdges = 0; int entryCounter = 0; @@ -908,7 +910,7 @@ public class SharedBuffer implements Serializable { // assign id to the sharedBufferEntry for the future // serialization of the previous relation - entryIDs.put(sharedBuffer, entryCounter++); + sharedBuffer.entryId = entryCounter++; ValueTimeWrapper valueTimeWrapper = sharedBuffer.getValueTime(); @@ -932,15 +934,15 @@ public class SharedBuffer implements Serializable { for (Map.Entry > sharedBufferEntry: page.entries.entrySet()) { SharedBufferEntry sharedBuffer = sharedBufferEntry.getValue(); - Integer id = entryIDs.get(sharedBuffer); - Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer); + int id = sharedBuffer.entryId; + Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer); for (SharedBufferEdge edge: sharedBuffer.edges) { // in order to serialize the previous relation we simply serialize the ids // of the source and target SharedBufferEntry if (edge.target != null) { - Integer targetId = entryIDs.get(edge.getTarget()); - Preconditions.checkState(targetId != null, + int targetId = edge.getTarget().entryId; + Preconditions.checkState(targetId != -1, "Could not find id for entry: " + edge.getTarget()); target.writeInt(id);
flink git commit: [FLINK-8312][table] Fix ScalarFunction varargs length exceed 254
Repository: flink Updated Branches: refs/heads/master d74869f8a -> 91f00ec91 [FLINK-8312][table] Fix ScalarFunction varargs length exceed 254 This closes #5206 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f00ec9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f00ec9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f00ec9 Branch: refs/heads/master Commit: 91f00ec91b55b28a59114e88127af2f85f279eb5 Parents: d74869f Author: XprayAuthored: Tue Dec 26 17:40:14 2017 +0800 Committer: sunjincheng121 Committed: Fri Dec 29 17:49:32 2017 +0800 -- .../functions/utils/ScalarSqlFunction.scala | 27 .../table/runtime/stream/sql/SqlITCase.scala| 27 2 files changed, 44 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/91f00ec9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala index 27e093d..cbe2ac7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala @@ -140,7 +140,7 @@ object ScalarSqlFunction { scalarFunction: ScalarFunction) : SqlOperandTypeChecker = { -val signatures = getMethodSignatures(scalarFunction, "eval") +val methods = checkAndExtractMethods(scalarFunction, "eval") /** * Operand type checker based on [[ScalarFunction]] given information. @@ -151,17 +151,24 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { -var min = 255 +var min = 254 // according to JVM spec 4.3.3 var max = -1 -signatures.foreach( sig => { - var len = sig.length - if (len > 0 && sig(sig.length - 1).isArray) { -max = 254 // according to JVM spec 4.3.3 -len = sig.length - 1 +var isVarargs = false +methods.foreach( + m => { +var len = m.getParameterTypes.length +if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { + isVarargs = true + len = len - 1 +} +max = Math.max(len, max) +min = Math.min(len, min) } - max = Math.max(len, max) - min = Math.min(len, min) -}) +) +if (isVarargs) { + // if eval method is varargs, set max to -1 to skip length check in Calcite + max = -1 +} SqlOperandCountRanges.between(min, max) } http://git-wip-us.apache.org/repos/asf/flink/blob/91f00ec9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 18b45a3..76d0126 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -26,10 +26,12 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.SplitUDF +import org.apache.flink.table.expressions.utils.Func15 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row import org.apache.flink.table.utils.MemoryTableSinkUtil + import org.junit.Assert._ import org.junit._ @@ -516,4 +518,29 @@ class SqlITCase extends StreamingWithStateTestBase { val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testUDFWithLongVarargs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +tEnv.registerFunction("func15",