flink git commit: [hotfix] Replace HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE with HA_CLUSTER_ID

2017-12-29 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master c4acbb838 -> b5db8d908


[hotfix] Replace HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE with 
HA_CLUSTER_ID


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5db8d90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5db8d90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5db8d90

Branch: refs/heads/master
Commit: b5db8d90818efb96ac407ccc213f2892f3852321
Parents: c4acbb8
Author: Till Rohrmann 
Authored: Fri Dec 29 16:06:59 2017 +0100
Committer: Till Rohrmann 
Committed: Fri Dec 29 16:06:59 2017 +0100

--
 .../java/org/apache/flink/configuration/ConfigConstants.java   | 4 ++--
 .../apache/flink/configuration/HighAvailabilityOptions.java| 5 -
 .../java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java| 6 +++---
 3 files changed, 5 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b5db8d90/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5fd7085..50039ac 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1031,7 +1031,7 @@ public final class ConfigConstants {
@Deprecated
public static final String HA_ZOOKEEPER_DIR_KEY = 
"high-availability.zookeeper.path.root";
 
-   /** @deprecated in favor of {@link 
HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
+   /** @deprecated in favor of {@link 
HighAvailabilityOptions#HA_CLUSTER_ID}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_NAMESPACE_KEY = 
"high-availability.zookeeper.path.namespace";
@@ -1788,7 +1788,7 @@ public final class ConfigConstants {
@Deprecated
public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
 
-   /** @deprecated in favor of {@link 
HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
+   /** @deprecated in favor of {@link 
HighAvailabilityOptions#HA_CLUSTER_ID}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5db8d90/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 2b026b9..6ee9f94 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -100,11 +100,6 @@ public class HighAvailabilityOptions {
.defaultValue("/flink")
.withDeprecatedKeys("recovery.zookeeper.path.root");
 
-   public static final ConfigOption HA_ZOOKEEPER_NAMESPACE =
-   key("high-availability.zookeeper.path.namespace")
-   .noDefaultValue()
-   
.withDeprecatedKeys("recovery.zookeeper.path.namespace");
-
public static final ConfigOption HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")

http://git-wip-us.apache.org/repos/asf/flink/blob/b5db8d90/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index c903a76..df4ef1f 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -73,7 +73,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static 
org.apache.flink.configuration.HighAvailabilityOptions.HA_ZOOKEEPER_NAMESPACE;
+import static 
org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -644,9 +644,9 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine
  

flink git commit: [FLINK-8227] Optimize the performance of SharedBufferSerializer

2017-12-29 Thread dwysakowicz
Repository: flink
Updated Branches:
  refs/heads/master 91f00ec91 -> c4acbb838


[FLINK-8227] Optimize the performance of SharedBufferSerializer

This closes #5142


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4acbb83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4acbb83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4acbb83

Branch: refs/heads/master
Commit: c4acbb838ffc9582436f976bfb7eaff838a0ed87
Parents: 91f00ec
Author: Dian Fu 
Authored: Sat Dec 9 11:51:04 2017 +0800
Committer: Dawid Wysakowicz 
Committed: Fri Dec 29 13:52:37 2017 +0100

--
 .../java/org/apache/flink/cep/nfa/SharedBuffer.java   | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c4acbb83/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 0cf47ca..29e8dc2 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -527,6 +527,7 @@ public class SharedBuffer 
implements Serializable {
private final Set> edges;
private final SharedBufferPage page;
private int referenceCounter;
+   private transient int entryId;
 
SharedBufferEntry(
final ValueTimeWrapper valueTime,
@@ -547,6 +548,8 @@ public class SharedBuffer 
implements Serializable {
 
referenceCounter = 0;
 
+   entryId = -1;
+
this.page = page;
}
 
@@ -886,7 +889,6 @@ public class SharedBuffer 
implements Serializable {
@Override
public void serialize(SharedBuffer record, DataOutputView 
target) throws IOException {
Map> pages = record.pages;
-   Map, Integer> entryIDs = new 
HashMap<>();
 
int totalEdges = 0;
int entryCounter = 0;
@@ -908,7 +910,7 @@ public class SharedBuffer 
implements Serializable {
 
// assign id to the sharedBufferEntry 
for the future
// serialization of the previous 
relation
-   entryIDs.put(sharedBuffer, 
entryCounter++);
+   sharedBuffer.entryId = entryCounter++;
 
ValueTimeWrapper valueTimeWrapper = 
sharedBuffer.getValueTime();
 
@@ -932,15 +934,15 @@ public class SharedBuffer 
implements Serializable {
for (Map.Entry> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry sharedBuffer = 
sharedBufferEntry.getValue();
 
-   Integer id = entryIDs.get(sharedBuffer);
-   Preconditions.checkState(id != null, 
"Could not find id for entry: " + sharedBuffer);
+   int id = sharedBuffer.entryId;
+   Preconditions.checkState(id != -1, 
"Could not find id for entry: " + sharedBuffer);
 
for (SharedBufferEdge edge: 
sharedBuffer.edges) {
// in order to serialize the 
previous relation we simply serialize the ids
// of the source and target 
SharedBufferEntry
if (edge.target != null) {
-   Integer targetId = 
entryIDs.get(edge.getTarget());
-   
Preconditions.checkState(targetId != null,
+   int targetId = 
edge.getTarget().entryId;
+   
Preconditions.checkState(targetId != -1,
"Could 
not find id for entry: " + edge.getTarget());
 
target.writeInt(id);



flink git commit: [FLINK-8312][table] Fix ScalarFunction varargs length exceed 254

2017-12-29 Thread jincheng
Repository: flink
Updated Branches:
  refs/heads/master d74869f8a -> 91f00ec91


[FLINK-8312][table] Fix ScalarFunction varargs length exceed 254

This closes #5206


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f00ec9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f00ec9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f00ec9

Branch: refs/heads/master
Commit: 91f00ec91b55b28a59114e88127af2f85f279eb5
Parents: d74869f
Author: Xpray 
Authored: Tue Dec 26 17:40:14 2017 +0800
Committer: sunjincheng121 
Committed: Fri Dec 29 17:49:32 2017 +0800

--
 .../functions/utils/ScalarSqlFunction.scala | 27 
 .../table/runtime/stream/sql/SqlITCase.scala| 27 
 2 files changed, 44 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/91f00ec9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index 27e093d..cbe2ac7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -140,7 +140,7 @@ object ScalarSqlFunction {
   scalarFunction: ScalarFunction)
 : SqlOperandTypeChecker = {
 
-val signatures = getMethodSignatures(scalarFunction, "eval")
+val methods = checkAndExtractMethods(scalarFunction, "eval")
 
 /**
   * Operand type checker based on [[ScalarFunction]] given information.
@@ -151,17 +151,24 @@ object ScalarSqlFunction {
   }
 
   override def getOperandCountRange: SqlOperandCountRange = {
-var min = 255
+var min = 254 // according to JVM spec 4.3.3
 var max = -1
-signatures.foreach( sig => {
-  var len = sig.length
-  if (len > 0 && sig(sig.length - 1).isArray) {
-max = 254  // according to JVM spec 4.3.3
-len = sig.length - 1
+var isVarargs = false
+methods.foreach(
+  m => {
+var len = m.getParameterTypes.length
+if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 
1).isArray) {
+  isVarargs = true
+  len = len - 1
+}
+max = Math.max(len, max)
+min = Math.min(len, min)
   }
-  max = Math.max(len, max)
-  min = Math.min(len, min)
-})
+)
+if (isVarargs) {
+  // if eval method is varargs, set max to -1 to skip length check in 
Calcite
+  max = -1
+}
 SqlOperandCountRanges.between(min, max)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91f00ec9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 18b45a3..76d0126 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -26,10 +26,12 @@ import 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.SplitUDF
+import org.apache.flink.table.expressions.utils.Func15
 import 
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.apache.flink.table.utils.MemoryTableSinkUtil
+
 import org.junit.Assert._
 import org.junit._
 
@@ -516,4 +518,29 @@ class SqlITCase extends StreamingWithStateTestBase {
 val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUDFWithLongVarargs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+tEnv.registerFunction("func15",