[GitHub] flink pull request #6356: [FLINK-9790] [doc] Add documentation for UDF in SQ...

2018-07-17 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/6356

[FLINK-9790] [doc] Add documentation for UDF in SQL Client

## What is the purpose of the change

This PR aims to add document for UDF in SQL Client.

## Brief change log

  - Add related document for SQL Client UDF in `sqlClient.md`.
  - Add an `incremental` parameter to the build script so that the build 
process can be more efficient.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-9790-udfdoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6356.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6356


commit da3a480c60b6a840bd5f558bb2bef43fe6822d33
Author: Xingcan Cui 
Date:   2018-07-17T16:22:26Z

[FLINK-9790] [doc] Add documentation for UDF in SQL Client




---


[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

2018-07-06 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @twalthr, I've made some changes to the PR.
1. Add a normalize method in `ClassTypeValidator` which converts the config 
like `constructor.0 = abc` to `constructor.0.type = STRING constructor.0.value 
= abc`.
2. Enrich the `DescriptorProperties` with more types.
3. Add a `getListProperties` which returns all values under a group list 
formed keys. E.g. `constructor` will return all `constructor.#`.
4. Refine the descriptor APIs.

I know there are still a lot of work to do (e.g., add more tests, document 
the features, support array formed parameters). I'll continue working on that 
if I still have some time before the deadline.


---


[GitHub] flink issue #6253: [FLINK-8094][Table API & SQL] Support other types for Exi...

2018-07-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6253
  
Hi @HeartSaVioR, really sorry for the late reply. The problem that has 
always been confusing me is how to configure the date format. Anyway, 
supporting the standard ISO one is a great first step. Thanks for your efforts 
and thanks @fhueske  for reviewing and merging this! 


---


[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

2018-07-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @twalthr,  please give me one more day. I will commit the changes 
tomorrow. 😄 


---


[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

2018-06-27 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @twalthr, sorry for the delay. I've been quite busy with my graduation 
these weeks. Everything's finished now and I'll put these tasks back on track.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-27 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r198485307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Descriptor for a primitive type.
+  */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+  // TODO not sure if we should the BasicTypeInfo here
+  var typeInformation: BasicTypeInfo[T] = _
+  var value: T = _
+
+  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --

Sounds reasonable. Thanks.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-27 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r198484614
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -187,6 +206,7 @@ private static ClusterSpecification 
createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
 
+   @SuppressWarnings("unchecked")
--- End diff --

Oh, I forgot the wildcard `?`. Sorry about that.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-27 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r198482931
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+   private static final String FROM = "from";
+
+   private From from;
+
+   private UDFDescriptor(String name, From from) {
+   super(name);
+   this.from = from;
+   }
+
+   public From getFrom() {
+   return from;
+   }
+
+   /**
+* Create a UDF descriptor with the given config.
+*/
+   public static UDFDescriptor create(Map config) {
+   if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+   throw new SqlClientException("The 'name' attribute of a 
function is missing.");
+   }
+
+   final Object name = 
config.get(FunctionValidator.FUNCTION_NAME());
+   if (!(name instanceof String) || ((String) name).length() <= 0) 
{
+   throw new SqlClientException("Invalid function name '" 
+ name + "'.");
+   }
--- End diff --

I see. Thanks for the explanation.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r197005582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Descriptor for a primitive type.
+  */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+  // TODO not sure if we should the BasicTypeInfo here
+  var typeInformation: BasicTypeInfo[T] = _
+  var value: T = _
+
+  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --

Yes, the `TypeStringUtils` can extract every valid type information and the 
user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just 
wonder how the basic types supported by Java can be properly inferred from the 
config file (e.g., how to decide a parameter `1` is a byte, a short or an int). 
Although a short value or a byte value can be represented with an int, that 
will affect the constructor searching via Java reflection. Do you have any 
ideas for that?


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196998269
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+  * Validator for [[PrimitiveTypeDescriptor]].
+  */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
+properties
+  .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", 
isOptional = false)
+properties
+  
.validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", 
isOptional = false, 1)
+  }
+}
+
+object PrimitiveTypeValidator {
+  val PRIMITIVE_TYPE = "type"
+  val PRIMITIVE_VALUE = "value"
+
+  def derivePrimitiveValue(keyPrefix: String, properties: 
DescriptorProperties): Any = {
+val typeInfo =
+  properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+val value = typeInfo match {
+  case basicType: BasicTypeInfo[_] =>
+basicType match {
+  case BasicTypeInfo.INT_TYPE_INFO =>
+properties.getInt(valueKey)
+  case BasicTypeInfo.LONG_TYPE_INFO =>
+properties.getLong(valueKey)
+  case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+properties.getDouble(valueKey)
+  case BasicTypeInfo.STRING_TYPE_INFO =>
+properties.getString(valueKey)
+  case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+properties.getBoolean(valueKey)
+  //TODO add more types
--- End diff --

Ah, yes. The array type was not considered. I'll think about that and add 
the support if it's not hard to implement. Otherwise, we could arrange it to a 
follow-up issue.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196996876
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
--- End diff --

The testing functions provided in `flink-table` are mainly considered for 
the functional test. While the functions added there are mainly considered for 
the constructional test, i.e., I made the UDF constructors complex or event 
sort of unreasonable...


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196996002
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 ---
@@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
assertEquals(expectedTableSchema, actualTableSchema);
}
 
+   @Test(timeout = 30_000L)
+   public void testScalarUDF() throws Exception {
--- End diff --

I'll add more test cases for that.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995942
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -187,6 +206,7 @@ private static ClusterSpecification 
createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
 
+   @SuppressWarnings("unchecked")
--- End diff --

The `AggregateFunction` and `TableFunction` take generic type parameters. 
Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, 
(AggregateFunction) udf);`) the Java compiler complains about that.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995241
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -101,6 +110,16 @@ public ExecutionContext(Environment 
defaultEnvironment, SessionContext sessionCo
}
});
 
+   // generate user-defined functions
+   functions = new HashMap<>();
+   mergedEnv.getFunctions().forEach((name, descriptor) -> {
+   DescriptorProperties properties = new 
DescriptorProperties(true);
+   descriptor.addProperties(properties);
+   functions.put(
+   name,
+   
FunctionValidator.generateUserDefinedFunction(properties, classLoader));
--- End diff --

That's a good idea.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995191
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
--- End diff --

I'm not sure if there will be more descriptors (for other purposes) extend 
`FunctionDescriptor` in the future. How about renaming it to 
`UserDefinedFunction`?


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196994767
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+   private static final String FROM = "from";
+
+   private From from;
+
+   private UDFDescriptor(String name, From from) {
+   super(name);
+   this.from = from;
+   }
+
+   public From getFrom() {
+   return from;
+   }
+
+   /**
+* Create a UDF descriptor with the given config.
+*/
+   public static UDFDescriptor create(Map config) {
+   if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+   throw new SqlClientException("The 'name' attribute of a 
function is missing.");
+   }
+
+   final Object name = 
config.get(FunctionValidator.FUNCTION_NAME());
+   if (!(name instanceof String) || ((String) name).length() <= 0) 
{
+   throw new SqlClientException("Invalid function name '" 
+ name + "'.");
+   }
--- End diff --

At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the 
type information for some parameters was lost in this process, e.g., we could 
not tell a `false` is a boolean or a string.


---


[GitHub] flink issue #5660: [FLINK-8861] [table] Add support for batch queries in SQL...

2018-06-11 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5660
  
Thanks for the improvements, @twalthr.


---


[GitHub] flink issue #5660: [FLINK-8861] [table] Add support for batch queries in SQL...

2018-06-10 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5660
  
Hi @twalthr, the PR has been reworked. Take a look when you are convenient. 
Thanks!


---


[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-06-10 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5660#discussion_r194250975
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
 ---
@@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, 
String binding) {
case PREV:
gotoPreviousPage();
break;
+   case FIRST:
--- End diff --

From the function's point of view, that's true. I added it just for 
symmetry 😃.


---


[GitHub] flink pull request #6106: [hotfix][table] Remove a println statement

2018-05-31 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/6106


---


[GitHub] flink issue #6106: [hotfix][table] Remove a println statement

2018-05-31 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6106
  
Merging this.


---


[GitHub] flink issue #6106: [hotfix][table] Remove a println statement

2018-05-31 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6106
  
It's my fault. Sorry about that:see_no_evil:


---


[GitHub] flink pull request #6106: [hotfix][table] Remove a println statement

2018-05-31 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/6106

[hotfix][table] Remove a println statement

## What is the purpose of the change

Remove a `println` statement in `TimeBoundedStreamJoin`.

## Brief change log

Remove a `println` statement in `TimeBoundedStreamJoin`.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (N/A)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6106.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6106


commit 060d225433f0fe4f5818e19cb500b75bb79ae60d
Author: Xingcan Cui 
Date:   2018-05-31T11:05:53Z

[hotfix] [table] Remove a println statement




---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-05-28 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/6090

[FLINK-8863] [SQL] Add user-defined function support in SQL Client

## What is the purpose of the change

This PR aims to add user-defined function (ScalarFunction, TableFunction 
and AggregateFunction) support to the SQL Client.

## Brief change log

  - Introduce a new `HierarchyDescriptor` and its corresponding validator 
`HierarchyDescriptorValidator`, which allow constructing descriptors 
hierarchically.
  - Add a `PrimitiveTypeDescriptor` to describe a primitive type value and 
a `ClassTypeDescriptor` to describe a class type value. A `ClassTypeDescriptor` 
contains a `constructor` field, which is composed of a list of 
`PrimitiveTypeDescriptor` or `ClassTypeDescriptor`.
  - Add a `UDFDescriptor` and its base class `FunctionDescriptor` to 
describe a `UserDefinedFunction`. Given a `DescriptorProperties`, a 
`UserDefinedFunction` can be instantiated with the 
`FunctionValidator.generateUserDefinedFunction()` method.
  - Add related tests for the new components.

## Verifying this change

The change can be verified with the test cases added in 
`LocalExecutorITCase.java`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (*the doc has not been finished 
yet*)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8863-udf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6090


commit c6c7d63f96eda01e24735271859ea8528e229021
Author: Xingcan Cui 
Date:   2018-05-27T15:36:25Z

[FLINK-8863] Add user-defined function support in SQL Client




---


[GitHub] flink issue #5660: [FLINK-8861] [table] Add support for batch queries in SQL...

2018-05-17 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5660
  
Sure @twalthr, I'll rebase the PR soon.


---


[GitHub] flink pull request #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT ...

2018-05-17 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6027#discussion_r188891227
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -809,6 +809,23 @@ trait ImplicitExpressionOperations {
 */
   def sha2(hashLength: Expression) = Sha2(expr, hashLength)
 
+  /**
+* Returns the Between with lower bound and upper bound.
+* @param lowerBound
+* @param upperBound
+* @return Returns the Between with lower bound and upper bound
+*/
+  def between(lowerBound: Expression, upperBound: Expression) =
+Between(expr, lowerBound, upperBound)
+
+  /**
+* Returns the not Between with lower bound and upper bound.
--- End diff --

Same with the comment above.


---


[GitHub] flink pull request #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT ...

2018-05-17 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6027#discussion_r188891077
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -809,6 +809,23 @@ trait ImplicitExpressionOperations {
 */
   def sha2(hashLength: Expression) = Sha2(expr, hashLength)
 
+  /**
+* Returns the Between with lower bound and upper bound.
--- End diff --

Add a new line here and describe whether the bound values are included in 
the interval.


---


[GitHub] flink pull request #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT ...

2018-05-17 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6027#discussion_r188894442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
 ---
@@ -105,3 +105,75 @@ case class If(
 }
   }
 }
+
+abstract class BetweenProperty(
+  expr: Expression,
+  lowerBound: Expression,
+  upperBound: Expression) extends Expression {
+
+  override private[flink] def resultType: TypeInformation[_] = 
BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override private[flink] def children: Seq[Expression] = Seq(expr, 
lowerBound, upperBound)
+
+  override private[flink] def validateInput(): ValidationResult = {
+(expr.resultType, lowerBound.resultType, upperBound.resultType) match {
+  case (exprType, lowerType, upperType)
+if isNumeric(exprType) && isNumeric(lowerType) && 
isNumeric(upperType)
+  => ValidationSuccess
+  case (exprType, lowerType, upperType)
+if isComparable(exprType) && exprType == lowerType && exprType == 
upperType
+  => ValidationSuccess
+  case (exprType, lowerType, upperType) =>
+ValidationFailure(
+  s"Between is only supported for numeric types and " +
+s"comparable types of same type, got $exprType, $lowerType and 
$upperType"
--- End diff --

Between is only supported for numeric types and identical comparable types, 
but got $exprType, $lowerType and $upperType.


---


[GitHub] flink pull request #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT ...

2018-05-17 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6027#discussion_r188895005
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
 ---
@@ -406,5 +406,26 @@ class ScalarOperatorsTest extends 
ScalarOperatorsTestBase {
   "true) === true) || false).cast(STRING) + 'X ').trim",
   "trueX")
 testTableApi(12.isNull, "12.isNull", "false")
+
+// between
+testTableApi(2.between(1, 3), "2.BETWEEN(1, 3)", "true")
--- End diff --

Would be better to add some tests for boundaries here.


---


[GitHub] flink pull request #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT ...

2018-05-17 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6027#discussion_r188895939
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
 ---
@@ -406,5 +406,26 @@ class ScalarOperatorsTest extends 
ScalarOperatorsTestBase {
   "true) === true) || false).cast(STRING) + 'X ').trim",
   "trueX")
 testTableApi(12.isNull, "12.isNull", "false")
+
--- End diff --

I think we can use a separate test method for `Between`. Besides, some 
validation tests can be added to 
`org.apache.flink.table.expressions.validation.ScalarOperatorsValidationTest`.


---


[GitHub] flink pull request #5210: [FLINK-8316] [table] The CsvTableSink and the CsvI...

2018-05-13 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5210


---


[GitHub] flink pull request #6003: [FLINK-9289] Parallelism of generated operators sh...

2018-05-13 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/6003

[FLINK-9289] Parallelism of generated operators should have max parallelism 
of input

## What is the purpose of the change

This PR aims to fix the default parallelism problem for the generated 
key-extraction mapper whose input is a union operator without parallelism in 
the batch environment.

## Brief change log

  - When creating a `Union` operator, automatically set its parallelism to 
the maximum one of its inputs.
  - Forbid the user to set parallelism for the union operator manually.
  - Add some test cases in `UnionOperatorTest.java` and 
`UnionTranslationTest.java`.
  - Adjust the results for `testUnionWithoutExtended()` and 
`testUnionWithExtended()` in `org.apache.flink.table.api.batch.ExplainTest`.
  - Remove the parallelism setting code for union in 
`PythonPlanBinder.java` and `PageRank.java`.

## Verifying this change

The change can be verified by the added test cases in 
`UnionOperatorTest.java` and `UnionTranslationTest.java`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-9289-parallelism

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6003.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6003


commit 35be0811ef0a5e6c572d0a60160fa18c3b6afefa
Author: Xingcan Cui <xingcanc@...>
Date:   2018-05-13T12:20:36Z

[FLINK-9289] Parallelism of generated operators should have max parallism 
of input




---


[GitHub] flink issue #5610: [FLINK-8537][table]Add a Kafka table source factory with ...

2018-04-23 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5610
  
Thanks for the review @twalthr. 😄 


---


[GitHub] flink issue #5660: [FLINK-8861] [table] Add support for batch queries in SQL...

2018-03-15 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5660
  
I see, @twalthr. Sorry for my impatience.


---


[GitHub] flink issue #5660: [FLINK-8861] [table] Add support for batch queries in SQL...

2018-03-14 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5660
  
Hi @twalthr and @fhueske, the PR has been rebased. I wonder if you could 
help review it when you are convenient. Thanks~


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-14 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174511059
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+  private val namePrefix: String,
--- End diff --

Four spaces indent for parameter declaring (which is also applicable for 
methods).


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339296
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  /** Return a deep copy of the [[TableSink]]. */
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
--- End diff --

The docs for overridden methods could be omitted.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
--- End diff --

Use the full class name for `[[Table]]` since it's not imported.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338993
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
--- End diff --

Remove unused imports.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
--- End diff --

Format the code like that.
```
class QueryableTableSink(
private val namePrefix: String,
private val queryConfig: StreamQueryConfig)
  extends UpsertStreamTableSink[Row]
...
```



---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339810
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
+
+val testHarness = createHarnessTester(operator,
+  new RowKeySelector(Array(0), keyType),
+  keyType)
+
+testHarness.open()
+
+
--- End diff --

Remove extra blank lines.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339658
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
--- End diff --

Try to avoid using deprecated classes/methods.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
--- End diff --

This line is too long (should be less than 100 characters).


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338947
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

This `process(processFunction)` method has been deprecated. Replace it with 
`process(KeyedProcessFunction)`.


---


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173888555
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map<String, String> tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

```
physical schema ==mapping=> "intermediate schema" ==timestamp extraction 
and projection=> logical schema
```
Maybe we should consider eliminating the "intermedia schema" in the future?


---


[GitHub] flink issue #5610: [FLINK-8537][table]Add a Kafka table source factory with ...

2018-03-12 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5610
  
Hi @twalthr, I've rebased this PR and fixed some problems.


---


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173879267
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map<String, String> tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

Well, according to the current implementation, you are right. But I still 
feel uncomfortable about that since we actually mix the physical schema (format 
schema) and the logical schema (table schema) into the same map. Do you think 
it's necessary to make some changes here?


---


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173860901
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map<String, String> tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

This "name" to "name" mapping should not exist since we've already 
explicitly defined the "fruit-name" to "name" mapping.


---


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173857218
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

I think we should first remove the added source fields before adding the 
explicit mappings with the following snippet. 
```
// add explicit mapping
case Some(source) =>
// should add mapping.remove(source)
mapping.put(name, source)
```


---


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173791129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

Not really. I was justing refactoring #5610. For convenience, I used the 
existing class `org.apache.flink.formats.avro.generated.User` in a test case, 
but it gets so many fields to be mapped. 😄 


---


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173784883
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

Hi @twalthr, can we check the used `TimestampExtractor` here? Specifically, 
if it's an `ExistingField`, we only included the target fields; if it's a 
`StreamRecordTimestamp` we don't include extra fields; and only if it's a 
custom extractor we include all the source fields.


---


[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...

2018-03-09 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the explanation, @twalthr! I'll update the PR and resolve the 
conflicts caused.


---


[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-03-07 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5660

[FLINK-8861] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5660


commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36
Author: Xingcan Cui <xingcanc@...>
Date:   2018-03-07T17:12:55Z

[FLINK-8861][table]Add support for batch queries in SQL Client




---


[GitHub] flink pull request #5659: [FLINK-8661] [table] Add support for batch queries...

2018-03-07 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5659


---


[GitHub] flink pull request #5659: [FLINK-8661] [table] Add support for batch queries...

2018-03-07 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5659

[FLINK-8661] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5659.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5659


commit 8f21a38dd60ab8c41893ec8c52bfcc47fc54648e
Author: Xingcan Cui <xingcanc@...>
Date:   2018-03-07T17:12:55Z

[FLINK-8661][table]Add support for batch queries in SQL Client




---


[GitHub] flink issue #5610: [FLINK-8537][table]Add a Kafka table source factory with ...

2018-03-01 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5610
  
Hi @twalthr @fhueske, I wonder if you could help review this PR when you 
are convenient.

Thanks, Xingcan 


---


[GitHub] flink pull request #5610: [FLINK-8537][table]Add a Kafka table source factor...

2018-03-01 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5610

[FLINK-8537][table]Add a Kafka table source factory with Avro format support

## What is the purpose of the change

This PR adds the Kafka table source factory with Avro format.

## Brief change log

### Adding
  - Add an `Avro` format descriptor and the corresponding validator.
  - Add a `KafkaAvroTableSourceFactory` and its version specific 
implementations.
  - Add a `KafkaTableSourceFactory` and make the json/avro factory extend 
it.
  - Add `equals()` and `hashCode()` methods to the timestamp/watermark 
extractors for testing.

### Updating
  - Change the schema configuration logic for `KafkaJsonTableSourceFactory`.
  - Add a `withFormat` flag to `KafkaJsonTableSourceFactory` test cases and 
replace the proctime attributes with the rowtime ones.
  - Move the `convertToRowTypeInformation()` method to a new helper class 
`AvroTypeInfoConverter`.

### Dependency
  - Make `flink-avro` depend on `flink-table` and `flink-streaming-scala`.
  - Remove the `flink-avro` dependency from `flink-table` and move the only 
effcted test `AvroTypesITCase` to `flink-avro`.

## Verifying this change

This change can be verified by the test cases in `AvroTest` and 
`KafkaAvroTableSourceFactoryTestBase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5610


commit 99387fcd699267bbf5f778e5389060eae5420a4e
Author: Xingcan Cui <xingcanc@...>
Date:   2018-03-01T11:21:05Z

[FLINK-8537][table]Add a Kafka table source factory with Avro format support




---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-28 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r171354820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_NAME = "name"
+  val SCHEMA_TYPE = "type"
+  val SCHEMA_PROCTIME = "proctime"
+  val SCHEMA_FROM = "from"
+
+  // utilities
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = toScala(
+properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+}
+  }
+}
+toJava(None)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  attributes += new RowtimeAttributeDescriptor(
+properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+extractor,
+strategy)
+}
+}
+
+attributes.asJava
+  }
+
+  /**
+* Finds a table source field mapping.
+*/
+  def deriveFieldMapping(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: util.Map[String, String] = {
+
+val mapping = mutable.Map[String, String]()
+
+val schema = properties.getTableSchema(SCHEMA)
+
+// add all schema fields first for implicit mappings
+schema.getColumnNames.foreach { name =>
+  mapping.put(name, name)
+}
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+  toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
 
-  // per column properties
+// add explicit mapping
+case Some(source) =>
+  mapping.put(name, source)
 
-  val NAME = "name"
-  val TYPE = "type"
-  val PROCTIME = "proctime"
-  val PROCTIME_VALUE_TRUE = "true"
-  val FROM = "from"
+// implicit mapping or time
+case None =>
+  val isProctime = properties
+.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+.orElse(false)
+  val isRowtime = properties
+.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  // remove proctime/rowtime from mapping
+  if (isProctime || isRowtime) {
+mapping.remove(name)
+  }
+  // check for invalid fields
+  else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
+throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
+  s"from source. Please specify the source field from which it 
can be derived.")
+  }
+  }
+}
 
+mapping.toMap.asJava
+  }
+
+  /**
+* Finds the fields that can be used for a format schema (without time 
attributes).
+*/
+  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
--- End diff --

Thanks for your explanation @twalthr. I totally agree that we should avoid 
letting the users define schemas multi-times. As the names and definitions are 
still confusing me, I'd share my understanding to see if it's correct. Let's 
take the KafkaJsonTableSource as an example. Briefly, the schema mapping can be 
illustrated with

```
json-format-schema

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-28 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r171280725
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_NAME = "name"
+  val SCHEMA_TYPE = "type"
+  val SCHEMA_PROCTIME = "proctime"
+  val SCHEMA_FROM = "from"
+
+  // utilities
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = toScala(
+properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+}
+  }
+}
+toJava(None)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  attributes += new RowtimeAttributeDescriptor(
+properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+extractor,
+strategy)
+}
+}
+
+attributes.asJava
+  }
+
+  /**
+* Finds a table source field mapping.
+*/
+  def deriveFieldMapping(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: util.Map[String, String] = {
+
+val mapping = mutable.Map[String, String]()
+
+val schema = properties.getTableSchema(SCHEMA)
+
+// add all schema fields first for implicit mappings
+schema.getColumnNames.foreach { name =>
+  mapping.put(name, name)
+}
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+  toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
 
-  // per column properties
+// add explicit mapping
+case Some(source) =>
+  mapping.put(name, source)
 
-  val NAME = "name"
-  val TYPE = "type"
-  val PROCTIME = "proctime"
-  val PROCTIME_VALUE_TRUE = "true"
-  val FROM = "from"
+// implicit mapping or time
+case None =>
+  val isProctime = properties
+.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+.orElse(false)
+  val isRowtime = properties
+.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  // remove proctime/rowtime from mapping
+  if (isProctime || isRowtime) {
+mapping.remove(name)
+  }
+  // check for invalid fields
+  else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
+throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
+  s"from source. Please specify the source field from which it 
can be derived.")
+  }
+  }
+}
 
+mapping.toMap.asJava
+  }
+
+  /**
+* Finds the fields that can be used for a format schema (without time 
attributes).
+*/
+  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
--- End diff --

Hi @twalthr, sorry for mentioning you again. I was a little confused about 
this method. Could you help explain its usage? Besides, the rowtime field 
should be an existing field in the input format. Why removing it here?

Thanks, Xingcan


---


[GitHub] flink pull request #5505: [FLINK-8538][table]Add a Kafka table source factor...

2018-02-23 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5505


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170175473
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
+
+  def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = {
+this.formatDescriptor = Some(format)
+this
+  }
+
+  def addSchema(schema: Schema): TestTableSourceDescriptor = {
+this.schemaDescriptor = Some(schema)
+this
+  }
+
+  def getPropertyMap: java.util.Map[String, String] = {
+val props = new DescriptorProperties()
+connectorDescriptor.addProperties(props)
--- End diff --

`addProperties()` has been removed and this method `getPropertyMap()` seems 
to be useless now.


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170188551
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
--- End diff --

` this.connectorDescriptor = Some(connector)`


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170181909
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
 ---
@@ -43,7 +42,7 @@ object TableSourceFactoryService extends Logging {
   def findTableSourceFactory(descriptor: TableSourceDescriptor): 
TableSource[_] = {
--- End diff --

Rename to `findAndCreateTableSource()`?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170175378
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
--- End diff --

Too many arguments.


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170194561
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
 ---
@@ -27,13 +27,27 @@ class ConnectorDescriptorValidator extends 
DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
 properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 
1)
-properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+properties.validateInt(CONNECTOR_PROPERTY_VERSION, isOptional = true, 
0, Integer.MAX_VALUE)
   }
 }
 
 object ConnectorDescriptorValidator {
 
+  /**
+* Key for describing the type of the connector. Usually used for 
factory discovery.
+*/
   val CONNECTOR_TYPE = "connector.type"
+
+  /**
+*  Key for describing the property version. This property can be used 
for backwards
--- End diff --

Two spaces here...


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170180103
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
--- End diff --

It seems we also need a `getOptionalClass()` wrapper for it. Additionally, 
I think it's a little strange to provide the superclass as a parameter here.


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170191830
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.formats.json.JsonSchemaConverter;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.Json;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactory;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KafkaJsonTableSourceFactory}.
+ */
+public abstract class KafkaJsonTableSourceFactoryTestBase {
+
+   private static final String JSON_SCHEMA =
+   "{" +
+   "  'title': 'Fruit'," +
+   "  'type': 'object'," +
+   "  'properties': {" +
+   "'name': {" +
+   "  'type': 'string'" +
+   "}," +
+   "'count': {" +
+   "  'type': 'integer'" +
+   "}," +
+   "'time': {" +
+   "  'description': 'Age in years'," +
+   "  'type': 'number'" +
+   "}" + "  }," +
+   "  'required': ['name', 'count', 'time']" +
+   "}";
+
+   private static final String TOPIC = "test-topic";
+
+   protected abstract String version();
+
+   protected abstract KafkaJsonTableSource.Builder builder();
+
+   protected abstract KafkaJsonTableSourceFactory factory();
+
+   @Test
+   public void testResultingTableSource() {
+
+   // construct table source using a builder
+
+   final Map<String, String> tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("fruit-name", "name");
+   tableJsonMapping.put("count", "count");
+   tableJsonMapping.put("event-time", "time");
+
+   final Properties props = new Properties();
+   props.put("group.id", "test-group");
+   props.put("bootstrap.servers", "localhost:1234");
+
+   final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+
+   final KafkaTableSource builderSource = builder()
+   
.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(JSON_SCHEMA)))
+   .failOnMissingField(true)
+   .withTableToJsonMapping(tableJsonMapping)
+   .withKafkaProperties(props)
+   .forTopic(TOPIC)
+   .fromSpecificOffsets(specificOffsets)
+   .withSchema(
+   Tab

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170185039
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * The validator for {@link Kafka}.
+ */
+public class KafkaValidator extends ConnectorDescriptorValidator {
+
+   public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
+   public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
+   public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
+   public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
+   public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+   public static final String CONNECTOR_TOPIC = "connector.topic";
+   public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
+   public static final String 
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS = 
"connector.specific-offsets";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = 
"partition";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
+   public static final String CONNECTOR_PROPERTIES = 
"connector.properties";
--- End diff --

If we take the required properties (e.g., bootstrap.servers, group.id) as 
common ones here, the validation logic is pushed down to the underlayer 
components, right?


---


[GitHub] flink issue #5505: [FLINK-8538][table]Add a Kafka table source factory with ...

2018-02-17 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5505
  
Yes @twalthr, that will be great! I'll start working on the 
`KafkaAvroTableSourceFactory` and keep an eye on the API refactorings.


---


[GitHub] flink pull request #5505: [FLINK-8538][table]Add a Kafka table source factor...

2018-02-16 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5505

[FLINK-8538][table]Add a Kafka table source factory with JSON format support

## What is the purpose of the change

This PR adds Kafka JSON table source factories for different Kafka versions.

## Brief change log

- Adds a `Kafka` connector descriptor and a corresponding `KafkaValidator`.
- Adds a `KafkaJsonTableSourceFactory` and different version specific 
implementations.
- Adds a method to get the column numbers in `TableSchema`.
- Adds `equals()` and `hashCode()` methods for `KafkaTableSource` and 
`KafkaJsonTableSource`.

**Note:** the rowtime setting has not been implemented yet as I think a 
more friendly API to get the rowtime attributes should be provided in 
`DescriptorProperties`.

## Verifying this change

This change can be verified by the tests added in 
`KafkaJsonTableFromDescriptorTestBase` and the other sub-classes. However, they 
are temporarily commented.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8538

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5505


commit 3da847507b65047a3fd02058596f6f712a9332de
Author: Xingcan Cui <xingcanc@...>
Date:   2018-02-12T10:11:36Z

[FLINK-8538][table]Add a Kafka table source factory with JSON format support




---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168446783
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link 
Row} for representing
+ * objects and tuple arrays.
+ *
+ * Note: This converter implements just a subset of the JSON schema 
specification.
--- End diff --

It seems that the JSON Schema is still evolving. Shall we consider 
specifying a version for that?


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168439090
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from JSON to Flink types.
+ *
+ * Deserializes the byte[] messages as a JSON object and 
reads
--- End diff --

messages -> message


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168449398
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a 
JSON bytes.
+ *
+ * Serializes the input Flink object into a JSON string and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
JsonRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class JsonRowSerializationSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = -2885556750743978636L;
+
+   /** Type information describing the input type. */
+   private final TypeInformation typeInfo;
+
+   /** Object mapper that is used to create output JSON objects. */
+   private final ObjectMapper mapper = new ObjectMapper();
+
+   /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone, without milliseconds). */
+   private SimpleDateFormat timeFormat = new 
SimpleDateFormat("HH:mm:ss'Z'");
+
+   /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone). */
+   private SimpleDateFormat timeFormatWithMillis = new 
SimpleDateFormat("HH:mm:ss.SSS'Z'");
+
+   /** Formatter for RFC 3339-compliant string representation of a 
timestamp value (with UTC timezone). */
+   private SimpleDateFormat timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+   /** Reusable object node. */
+   private transient ObjectNode node;
+
+   /**
+* Creates a JSON serialization schema for the given type information.
+*
+* @param typeInfo The field names of {@link Row} are used to map to 
JSON properties.
+*/
+   public JsonRowSerializationSchema(TypeInformation typeInfo) {
+   Preconditions.checkNotNull(typeInfo, "Type information");
+   this.typeInfo = typeInfo;
+   }
+
+   /**
+* Creates a JSON serialization schema for the given JSON schema.
+*
+* @param jsonSchema JSON schema describing the result type
+*
+* @see http://json-schema.org/;>http://json-schema.org/
+*/
+   public JsonRowSerializationSchema(String jsonSchema) {
+   this(JsonSchemaConverter.convert(jsonSchema));
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
--- End diff --

The `serialize()` method is also not thread-safe since it in

[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168438157
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an 
ObjectNode.
+ *
+ * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
+ */
+@PublicEvolving
+public class JsonNodeDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   private static final long serialVersionUID = -1699854177598621044L;
+
+   private ObjectMapper mapper;
+
+   @Override
+   public ObjectNode deserialize(byte[] message) throws IOException {
--- End diff --

IMO, this method should be thread-safe.


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168439885
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link 
Row} for representing
+ * objects and tuple arrays.
+ *
+ * Note: This converter implements just a subset of the JSON schema 
specification.
+ * Union types (as well as "allOf", "anyOf", "not") are not supported yet. 
Simple
+ * references that link to a common definition in the document are 
supported. "oneOf" and
+ * arrays of type are only supported for specifying nullability;
+ */
+@SuppressWarnings("OptionalIsPresent")
+public final class JsonSchemaConverter {
+
+   private JsonSchemaConverter() {
+   // private
+   }
+
+   // see 
https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf
+   private static final String PROPERTIES = "properties";
+   private static final String ADDITIONAL_PROPERTIES = 
"additionalProperties";
+   private static final String TYPE = "type";
+   private static final String FORMAT = "format";
+   private static final String CONTENT_ENCODING = "contentEncoding";
+   private static final String ITEMS = "items";
+   private static final String ADDITIONAL_ITEMS = "additionalItems";
+   private static final String REF = "$ref";
+   private static final String ALL_OF = "allOf";
+   private static final String ANY_OF = "anyOf";
+   private static final String NOT = "not";
+   private static final String ONE_OF = "oneOf";
+
+   // from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14
+   private static final String DISALLOW = "disallow";
+   private static final String EXTENDS = "extends";
+
+   private static final String TYPE_NULL = "null";
+   private static final String TYPE_BOOLEAN = "boolean";
+   private static final String TYPE_OBJECT = "object";
+   private static final String TYPE_ARRAY = "array";
+   private static final String TYPE_NUMBER = "number";
+   private static final String TYPE_INTEGER = "integer";
+   private static final String TYPE_STRING = "string";
+
+   private static final String FORMAT_DATE = "date";
+   private static final String FORMAT_TIME = "time";
+   private static final String FORMAT_DATE_TIME = "date-time";
+
+   private static final String CONTENT_ENCODING_BASE64 = "base64";
+
+   /**
+* Converts a JSON schema into Flink's type information. Throws an 
exception of the schema
--- End diff --

of -> if


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168436287
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an 
ObjectNode.
+ *
+ * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
+ */
+@PublicEvolving
+public class JsonNodeDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   private static final long serialVersionUID = -1699854177598621044L;
+
+   private ObjectMapper mapper;
+
+   @Override
+   public ObjectNode deserialize(byte[] message) throws IOException {
+   if (mapper == null) {
+   mapper = new ObjectMapper();
+   }
+   return mapper.readValue(message, ObjectNode.class);
+   }
+
+   @Override
+   public boolean isEndOfStream(ObjectNode nextElement) {
--- End diff --

This overridden method seems to be redundant.


---


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-31 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Sure. Thanks for helping merge this!


---


[GitHub] flink pull request #5369: [FLINK-8407][DataStream]Setting the parallelism af...

2018-01-31 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5369


---


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-30 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Ah ha, it doesn't matter. 😄 The test has been updated. Actually, I 
wanted to ensure that all the partitioning methods should cause the exception. 
However, that would be fussy and thus I only kept the broadcasted one.


---


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-30 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
That makes sense to me. I just wonder what do you mean by "add a test for 
the Java API"...


---


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-30 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Hi @aljoscha, since the `DataStream` in Java API do not own a 
`setParallelism` method as the Scala API, do you think it's necessary to add 
such one?


---


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-26 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Sure. I'll update the PR to make it more appropriate and thanks for your 
review, @aljoscha.


---


[GitHub] flink pull request #5369: [FLINK-8407] [DataStream] Setting the parallelism ...

2018-01-26 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5369

[FLINK-8407] [DataStream] Setting the parallelism after a partitionin…

## What is the purpose of the change

This PR forbids the users to set parallelism after a partitioning operation 
(e.g., broadcast, rescale).

## Brief change log

Removes the overridden method for `setConnectionType` in 
`SingleOutputStreamOperator.java`.

## Verifying this change

This change can be verified by the added test 
`testParallelismFailAfterPartitioning` in `DataStreamTest.scala`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8407

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5369


commit eb725c745ff24442c5f606402c822c517d36a743
Author: Xingcan Cui <xingcanc@...>
Date:   2018-01-26T02:28:58Z

[Flink-8407] [DataStream] Setting the parallelism after a partitioning 
operation should be forbidden




---


[GitHub] flink pull request #5368: [Flink-8407][DataStream]Setting the parallelism af...

2018-01-26 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5368


---


[GitHub] flink pull request #5368: [Flink-8407][DataStream]Setting the parallelism af...

2018-01-26 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5368

[Flink-8407][DataStream]Setting the parallelism after a partitioning 
operation should be forbidden

## What is the purpose of the change

This PR forbids the users to set parallelism after a partitioning operation 
(e.g., broadcast, rescale).

## Brief change log

Removes the overridden method for `setConnectionType` in 
`SingleOutputStreamOperator.java`.

## Verifying this change

This change can be verified by the added test 
`testParallelismFailAfterPartitioning` in `DataStreamTest.scala`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8407

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5368


commit eb725c745ff24442c5f606402c822c517d36a743
Author: Xingcan Cui <xingcanc@...>
Date:   2018-01-26T02:28:58Z

[Flink-8407] [DataStream] Setting the parallelism after a partitioning 
operation should be forbidden




---


[GitHub] flink issue #5210: [FLINK-8316] [table] The CsvTableSink and the CsvInputFor...

2018-01-15 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5210
  
Hi @sunjincheng121, thanks for your reply. I think an example would be 
that, for some non-standard CSV files like `a, b , c,`, if the boolean flag 
`trailingDelimiter=false`, the file will be parsed with 4 fields; while if 
`trailingDelimiter=true`, the file will be parsed with 3 fields, in which the 
trailing delimiter `,` is omitted. Further, the trailing delimiter could be set 
as another character, e.g., `a, b, c;`.


---


[GitHub] flink issue #5210: [FLINK-8316] [table] The CsvTableSink and the CsvInputFor...

2018-01-09 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5210
  
Hi @sunjincheng121, yes, the reported issue should be solved with 
FLINK-8331. In addition, maybe we can add the trailing delimiter as a new 
feature (like the 
[CSVParser.java](https://github.com/apache/commons-csv/blob/master/src/main/java/org/apache/commons/csv/CSVParser.java)
 and 
[CSVFormat.java](https://github.com/apache/commons-csv/blob/master/src/main/java/org/apache/commons/csv/CSVFormat.java#L559)
 in commons-csv). I'll do some refactorings to allow separate trailing 
delimiters for both the input and output. What do you think?


---


[GitHub] flink issue #5210: [FLINK-8316] [table] The CsvTableSink and the CsvInputFor...

2018-01-08 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5210
  
Hi @sunjincheng121, thanks for the review! You may refer to [this 
thread](https://lists.apache.org/thread.html/cfe3b1718a479300dc91d1523be023ef5bc702bd5ad53af4fea5a596@%3Cuser.flink.apache.org%3E)
 for further background of the issue.

Thanks, Xingcan


---


[GitHub] flink issue #5140: [FLINK-7797] [table] Add support for windowed outer joins...

2018-01-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5140
  
Hi @fhueske, thanks for your concrete suggestions! IMO, the refactorings in 
`TimeBoundedStreamJoin` are quite reasonable, while the refactoring for 
`createNegativeWindowSizeJoin()` may not be so significant as the negative 
window size should be taken as an exception. Anyway, I've applied them for 
better efficiency. 😄

Thanks, Xingcan


---



[GitHub] flink issue #5169: [FLINK-8258] [table] Enable query configuration for batch...

2018-01-03 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5169
  
Thanks for the review @twalthr 😄 


---


[GitHub] flink pull request #5192: [FLINK-8257] [conf] Unify the value checks for set...

2018-01-02 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5192


---


[GitHub] flink issue #5192: [FLINK-8257] [conf] Unify the value checks for setParalle...

2018-01-02 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5192
  
Thanks for the review, @aljoscha 😄 . I'll close this PR.


---


[GitHub] flink issue #5140: [FLINK-7797] [table] Add support for windowed outer joins...

2018-01-02 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5140
  
Hi @fhueske, thanks for your review. I've made the following changes to the 
PR.
1. Fixes the "wrong sides" problem in `TimeBoundedStreamJoin`.
2. Adds the logic for outer-joins with negative window size in 
`DataStreamWindowJoin`.
3. Refines the `EmitAwareCollector` according to your suggestions.
4. Uses a separate class `OuterJoinPaddingUtil` to deal with results 
padding.
5. Adds some test cases to `JoinITCase` and `JoinHarnessTest`.
6. Other minor changes to attribute/method/class names.

Thanks, Xingcan


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159069304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < rightRows.size) {
-joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = rightRows.get(i)
+joinFunction.join(leftRow, tuple.f0, joinCollector)
+if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the right row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+rightEntry.setValue(rightRows)
+  }
 }
 
 if (rightTime <= rightExpirationTime) {
+  if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

Yes, I should have added a harness test for that.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159069074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
+
+  private var emitted = false
+  private var emittedThisTurn = false
+  private var innerCollector: Collector[CRow] = _
+  private val cRow: CRow = new CRow()
--- End diff --

I'll add a function to set this value in the Collector.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159023858
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -142,50 +143,47 @@ class DataStreamWindowJoin(
 s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
 s"join: (${joinSelectionToString(schema.relDataType)})"
 
-joinType match {
-  case JoinRelType.INNER =>
-if (relativeWindowSize < 0) {
-  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
-" please check the join conditions.")
-  createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
-} else {
-  if (isRowTime) {
-createRowTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  } else {
-createProcTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  }
-}
-  case JoinRelType.FULL =>
-throw new TableException(
-  "Full join between stream and stream is not supported yet.")
-  case JoinRelType.LEFT =>
-throw new TableException(
-  "Left join between stream and stream is not supported yet.")
-  case JoinRelType.RIGHT =>
-throw new TableException(
-  "Right join between stream and stream is not supported yet.")
+val flinkJoinType = joinType match {
+  case JoinRelType.INNER => JoinType.INNER
+  case JoinRelType.FULL => JoinType.FULL_OUTER
+  case JoinRelType.LEFT => JoinType.LEFT_OUTER
+  case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+}
+
+if (relativeWindowSize < 0) {
+  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
+" please check the join conditions.")
+  createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
--- End diff --

Yes, your are right. I'll add this part.


---


[GitHub] flink pull request #5210: [FLINK-8316][table]The CsvTableSink and the CsvInp...

2017-12-28 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5210

[FLINK-8316][table]The CsvTableSink and the CsvInputFormat are not in sync

## What is the purpose of the change

This PR adds an extra parameter (`trailingDelim`) to `CsvTableSink` to 
enable appending a trailing field delimiter for each row.

## Brief change log

  - Adds an extra boolean parameter `trailingDelim` to `CsvTableSink`.
  - Adds a related test case to 
`org.apache.flink.table.runtime.batch.table.TableSinkITCase`.


## Verifying this change

This change can be verified by the added test case in `TableSinkITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (**yes**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8316

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5210


commit 27a75a8ad593cddb69873c67ad9e486cca580dbe
Author: Xingcan Cui <xingcanc@...>
Date:   2017-12-28T08:30:01Z

[FLINK-8316][table]The CsvTableSink and the CsvInputFormat are not in sync




---


[GitHub] flink pull request #5177: [FLINK-8278] [doc] Fix the private member init pro...

2017-12-26 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5177


---


[GitHub] flink issue #5177: [FLINK-8278] [doc] Fix the private member init problem fo...

2017-12-26 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5177
  
Thanks for looking into this @fhueske. I'll close this PR.


---


[GitHub] flink issue #5177: [FLINK-8278] [doc] Fix the private member init problem fo...

2017-12-21 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5177
  
Hi @fhueske, I wonder if you could have a look at this PR when you are 
convenient.

Thanks, Xingcan


---


  1   2   3   >