flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule

2017-11-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.4 e7f7d0c93 -> 42e24413b


[FLINK-7986] [table] Introduce FilterSetOpTransposeRule

This closes #4956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42e24413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42e24413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42e24413

Branch: refs/heads/release-1.4
Commit: 42e24413b5a47928e06f2a61086f7559370c65d8
Parents: e7f7d0c
Author: Xpray 
Authored: Mon Nov 6 23:47:33 2017 +0800
Committer: twalthr 
Committed: Thu Nov 16 16:16:22 2017 +0100

--
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../api/batch/table/SetOperatorsTest.scala  | 80 
 .../api/stream/table/SetOperatorsTest.scala | 68 +
 3 files changed, 150 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index dcc735d..a20d14f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -52,6 +52,8 @@ object FlinkRuleSets {
 FilterJoinRule.JOIN,
 // push filter through an aggregation
 FilterAggregateTransposeRule.INSTANCE,
+// push filter through set operation
+FilterSetOpTransposeRule.INSTANCE,
 
 // aggregation and projection rules
 AggregateProjectMergeRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 2d4e205..35f4429 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase {
 
 util.verifyJavaTable(in, expected)
   }
+
+  @Test
+  def testFilterUnionTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.unionAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetUnion",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("union", "a", "b", "c")
+),
+term("groupBy", "b"),
+term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+  ),
+  term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilterMinusTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.minusAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetMinus",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("minus", "a", "b", "c")
+),
+term("groupBy", "b"),
+term("select", "b", "SUM(a) AS TMP_

flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule

2017-11-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master cd1fbc078 -> 81dc260dc


[FLINK-7986] [table] Introduce FilterSetOpTransposeRule

This closes #4956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d

Branch: refs/heads/master
Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e
Parents: cd1fbc0
Author: Xpray 
Authored: Mon Nov 6 23:47:33 2017 +0800
Committer: twalthr 
Committed: Thu Nov 16 14:43:50 2017 +0100

--
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../api/batch/table/SetOperatorsTest.scala  | 80 
 .../api/stream/table/SetOperatorsTest.scala | 68 +
 3 files changed, 150 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index dcc735d..a20d14f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -52,6 +52,8 @@ object FlinkRuleSets {
 FilterJoinRule.JOIN,
 // push filter through an aggregation
 FilterAggregateTransposeRule.INSTANCE,
+// push filter through set operation
+FilterSetOpTransposeRule.INSTANCE,
 
 // aggregation and projection rules
 AggregateProjectMergeRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 2d4e205..35f4429 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase {
 
 util.verifyJavaTable(in, expected)
   }
+
+  @Test
+  def testFilterUnionTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.unionAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetUnion",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("union", "a", "b", "c")
+),
+term("groupBy", "b"),
+term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+  ),
+  term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilterMinusTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.minusAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetMinus",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("minus", "a", "b", "c")
+),
+term("groupBy", "b"),
+term("select", "b", "SUM(a) AS TMP_0", "COUNT

[1/5] flink git commit: [FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.

2017-11-16 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.4 397f0d150 -> e7f7d0c93


[FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.

This closes #4989.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59fae3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59fae3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59fae3f

Branch: refs/heads/release-1.4
Commit: b59fae3f51b22c299c3d4c51ff0da74fd245072b
Parents: 397f0d1
Author: Shuyi Chen 
Authored: Thu Nov 9 00:05:20 2017 -0800
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:45:38 2017 +0100

--
 .../plan/schema/CompositeRelDataType.scala  |  4 +++-
 .../table/runtime/batch/sql/CalcITCase.scala| 19 ++
 .../table/runtime/batch/table/CalcITCase.scala  | 10 +++---
 .../table/runtime/stream/sql/SqlITCase.scala| 21 
 .../table/runtime/stream/table/CalcITCase.scala | 16 +++
 .../table/runtime/utils/StreamTestData.scala|  9 +
 6 files changed, 71 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b59fae3f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index e0c6b6f..f8c61fb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -38,7 +38,9 @@ class CompositeRelDataType(
 val compositeType: CompositeType[_],
 val nullable: Boolean,
 typeFactory: FlinkTypeFactory)
-  extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, 
typeFactory)) {
+  extends RelRecordType(
+StructKind.PEEK_FIELDS_NO_EXPAND,
+createFieldList(compositeType, typeFactory)) {
 
   override def toString = s"COMPOSITE($compositeType)"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b59fae3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index b891a7d..7ca3e9c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -67,6 +67,25 @@ class CalcITCase(
   }
 
   @Test
+  def testSelectStarFromNestedTable(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT * FROM MyTable"
+
+val ds = 
CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv).as('a, 'b)
+tEnv.registerTable("MyTable", ds)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n"
+
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testSelectStarFromDataSet(): Unit = {
 
 val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/b59fae3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index e947c3f..22373d2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -118,14 +118,10 @@ class CalcITCase(
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c).select('*)
+val t = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 
'a, 'b).select('*)
 
-val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\

[5/5] flink git commit: [FLINK-8069] [table] Add preserving WatermarkStrategy.

2017-11-16 Thread fhueske
[FLINK-8069] [table] Add preserving WatermarkStrategy.

This closes #5016.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7f7d0c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7f7d0c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7f7d0c9

Branch: refs/heads/release-1.4
Commit: e7f7d0c9333f3f9db488c5f968a1627401485067
Parents: f14fcef
Author: Xingcan Cui 
Authored: Wed Nov 15 11:01:13 2017 +0800
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:47:16 2017 +0100

--
 .../datastream/StreamTableSourceScan.scala  |  5 +-
 .../wmstrategies/watermarkStrategies.scala  |  6 +++
 .../stream/table/TableSourceITCase.scala| 51 
 .../flink/table/utils/testTableSources.scala| 28 ++-
 4 files changed, 88 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 5d305b4..9179d4b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources._
-import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, 
PunctuatedWatermarkAssigner}
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, 
PunctuatedWatermarkAssigner, PreserveWatermarks}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
@@ -134,6 +134,9 @@ class StreamTableSourceScan(
 case p: PunctuatedWatermarkAssigner =>
   val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
   ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+case _: PreserveWatermarks =>
+  // The watermarks have already been provided by the underlying 
DataStream.
+  ingestedTable
   }
 } else {
   // No need to generate watermarks if no rowtime attribute is specified.

http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 0dd82f1..4c7f4e4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends 
WatermarkStrategy {
 */
   def getWatermark(row: Row, timestamp: Long): Watermark
 }
+
+/** A strategy which indicates the watermarks should be preserved from the 
underlying datastream.*/
+class PreserveWatermarks extends WatermarkStrategy
+object PreserveWatermarks {
+  val INSTANCE: PreserveWatermarks = new PreserveWatermarks
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index 77c1e08..c9ea30a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.a

[3/5] flink git commit: [FLINK-8014] [table] Add Kafka010JsonTableSink.

2017-11-16 Thread fhueske
[FLINK-8014] [table] Add Kafka010JsonTableSink.

- Refactor KafkaTableSink tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fd53112
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fd53112
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fd53112

Branch: refs/heads/release-1.4
Commit: 2fd53112289ec9002b2c26920dd56ea77be73cd5
Parents: 8883fa2
Author: Fabian Hueske 
Authored: Tue Nov 7 17:59:43 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:45:54 2017 +0100

--
 .../connectors/kafka/Kafka010JsonTableSink.java | 73 
 .../kafka/Kafka010JsonTableSinkTest.java| 53 ++
 .../connectors/kafka/Kafka08JsonTableSink.java  | 26 ++-
 .../kafka/Kafka08JsonTableSinkTest.java | 27 +++-
 .../connectors/kafka/Kafka09JsonTableSink.java  | 26 ++-
 .../kafka/Kafka09JsonTableSinkTest.java | 27 +++-
 .../connectors/kafka/KafkaJsonTableSink.java|  5 +-
 .../connectors/kafka/KafkaTableSink.java| 10 ++-
 .../JsonRowSerializationSchema.java | 22 +-
 .../kafka/JsonRowSerializationSchemaTest.java   | 46 
 .../kafka/KafkaTableSinkTestBase.java   | 30 
 11 files changed, 269 insertions(+), 76 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2fd53112/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
new file mode 100644
index 000..431ace0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka010JsonTableSink extends KafkaJsonTableSink {
+
+   /**
+* Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+* topic with fixed partition assignment.
+*
+* Each parallel TableSink instance will write its rows to a single 
Kafka partition.
+* 
+* If the number of Kafka partitions is less than the number of 
sink instances, different
+* sink instances will write to the same partition.
+* If the number of Kafka partitions is higher than the number of 
sink instance, some
+* Kafka partitions won't receive data.
+* 
+*
+* @param topic topic in Kafka to which table is written
+* @param properties properties to connect to Kafka
+*/
+   public Kafka010JsonTableSink(String topic, Properties properties) {
+   super(topic, properties, new FlinkFixedPartitioner<>());
+   }
+
+   /**
+* Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+* topic with custom partition assignment.
+*
+* @param topic topic in Kafka to which table is written
+* @param properties properties to connect to Kafka
+* @param partitioner Kafka partitioner
+*/
+   public Kafka010JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner partitioner) {
+   super(topic, properties, partitioner);

[4/5] flink git commit: [FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.

2017-11-16 Thread fhueske
[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.

This closes #4990.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f14fcefb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f14fcefb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f14fcefb

Branch: refs/heads/release-1.4
Commit: f14fcefbe1b30eb8178a5caa053c84e505c288b2
Parents: 2fd5311
Author: Fabian Hueske 
Authored: Thu Nov 9 15:07:17 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:46:04 2017 +0100

--
 docs/dev/table/sourceSinks.md | 45 ++
 1 file changed, 45 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f14fcefb/docs/dev/table/sourceSinks.md
--
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index dfa7954..0b4bdbe 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -496,6 +496,7 @@ The following table lists the `TableSink`s which are 
provided with Flink.
 | `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | 
Writes a Table to a Cassandra table. 
 | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 
0.8 sink with JSON encoding.
 | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 
0.9 sink with JSON encoding.
+| `Kafka010JsonTableSink` | `flink-connector-kafka-0.10` | N | Append | A 
Kafka 0.10 sink with JSON encoding.
 
 All sinks that come with the `flink-table` dependency can be directly used by 
your Table programs. For all other table sinks, you have to add the respective 
dependency in addition to the `flink-table` dependency.
 
@@ -503,6 +504,50 @@ A custom `TableSink` can be defined by implementing the 
`BatchTableSink`, `Appen
 
 {% top %}
 
+### KafkaJsonTableSink
+
+A `KafkaJsonTableSink` emits a [streaming append 
`Table`](./streaming.html#table-to-stream-conversion) to an Apache Kafka topic. 
The rows of the table are encoded as JSON records. Currently, only tables with 
flat schema, i.e., non-nested fields, are supported. 
+
+A `KafkaJsonTableSink` produces with at-least-once guarantees into a Kafka 
topic if the query is executed with [checkpointing enabled]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). 
+
+By default, a `KafkaJsonTableSink` writes to at most as many partitions as its 
own parallelism (each parallel instance of the sink writes to exactly one 
partition). In order to distribute the writes to more partitions or control the 
routing of rows into partitions, a custom `FlinkKafkaPartitioner` can be 
provided.
+
+The following example shows how to create a `KafkaJsonTableSink` for Kafka 
0.10. Sinks for Kafka 0.8 and 0.9 are instantiated analogously. 
+
+
+
+{% highlight java %}
+
+Table table = ...
+
+Properties props = new Properties();
+props.setProperty("bootstrap.servers", "localhost:9092");
+
+table.writeToSink(
+  new Kafka010JsonTableSink(
+"myTopic",// Kafka topic to write to
+props));  // Properties to configure the producer
+
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+
+val table: Table = ???
+
+val props = new Properties()
+props.setProperty("bootstrap.servers", "localhost:9092")
+
+table.writeToSink(
+  new Kafka010JsonTableSink(
+"myTopic",// Kafka topic to write to
+props))   // Properties to configure the producer
+  
+{% endhighlight %}
+
+
+
 ### CsvTableSink
 
 The `CsvTableSink` emits a `Table` to one or more CSV files. 



[2/5] flink git commit: [FLINK-7389] [table] Remove Calcite PushProjector

2017-11-16 Thread fhueske
[FLINK-7389] [table] Remove Calcite PushProjector

This closes #5022.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8883fa2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8883fa2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8883fa2d

Branch: refs/heads/release-1.4
Commit: 8883fa2dd910909d4aaf5c57cf4b15a41c8cf7e1
Parents: b59fae3
Author: twalthr 
Authored: Thu Nov 16 10:20:16 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:45:46 2017 +0100

--
 .../apache/calcite/rel/rules/PushProjector.java | 868 ---
 .../table/runtime/batch/sql/CalcITCase.scala|   2 +-
 .../table/runtime/batch/sql/JoinITCase.scala|   4 +-
 3 files changed, 4 insertions(+), 870 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8883fa2d/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
--
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
deleted file mode 100644
index 0955aeb..000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
+++ /dev/null
@@ -1,868 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.Strong;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.SemiJoin;
-import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.runtime.PredicateImpl;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.util.BitSets;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.Set;
-
-// This class is copied from Apache Calcite except that it does not
-// automatically name the field using the name of the operators
-// as the Table API rejects special characters like '-' in the field names.
-
-/**
- * PushProjector is a utility class used to perform operations used in push
- * projection rules.
- *
- * Pushing is particularly interesting in the case of join, because there
- * are multiple inputs. Generally an expression can be pushed down to a
- * particular input if it depends upon no other inputs. If it can be pushed
- * down to both sides, it is pushed down to the left.
- *
- * Sometimes an expression needs to be split before it can be pushed down.
- * To flag that an expression cannot be split, specify a rule that it must be
- * preserved. Such an expression will be pushed down intact to one
- * of the inputs, or not pushed down at all.
- */
-public class PushProjector {
-  //~ Instance fields 
-
-  private final Project origProj;
-  private final RexNode origFilter;
-  private final RelNode childRel;
-  private final ExprCondition preserveExprCondition;
-  private final RelBuilder relBuilder;
-
-  /**
-   * Original projection expressions
-   */
-  final List origProjExprs;
-
-  /**
-   * Fields from the RelNode that the projection is being pushed past
-   

[4/5] flink git commit: [FLINK-8069] [table] Add preserving WatermarkStrategy.

2017-11-16 Thread fhueske
[FLINK-8069] [table] Add preserving WatermarkStrategy.

This closes #5016.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd1fbc07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd1fbc07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd1fbc07

Branch: refs/heads/master
Commit: cd1fbc0789d039e38c10d0a978137156aacbf186
Parents: c697bc1
Author: Xingcan Cui 
Authored: Wed Nov 15 11:01:13 2017 +0800
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:32:12 2017 +0100

--
 .../datastream/StreamTableSourceScan.scala  |  5 +-
 .../wmstrategies/watermarkStrategies.scala  |  6 +++
 .../stream/table/TableSourceITCase.scala| 51 
 .../flink/table/utils/testTableSources.scala| 28 ++-
 4 files changed, 88 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 5d305b4..9179d4b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources._
-import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, 
PunctuatedWatermarkAssigner}
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, 
PunctuatedWatermarkAssigner, PreserveWatermarks}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
@@ -134,6 +134,9 @@ class StreamTableSourceScan(
 case p: PunctuatedWatermarkAssigner =>
   val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
   ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+case _: PreserveWatermarks =>
+  // The watermarks have already been provided by the underlying 
DataStream.
+  ingestedTable
   }
 } else {
   // No need to generate watermarks if no rowtime attribute is specified.

http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 0dd82f1..4c7f4e4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends 
WatermarkStrategy {
 */
   def getWatermark(row: Row, timestamp: Long): Watermark
 }
+
+/** A strategy which indicates the watermarks should be preserved from the 
underlying datastream.*/
+class PreserveWatermarks extends WatermarkStrategy
+object PreserveWatermarks {
+  val INSTANCE: PreserveWatermarks = new PreserveWatermarks
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index 77c1e08..c9ea30a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.da

[3/5] flink git commit: [FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.

2017-11-16 Thread fhueske
[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.

This closes #4990.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c697bc14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c697bc14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c697bc14

Branch: refs/heads/master
Commit: c697bc1499de5db324ff64af0341ebfccf6a6125
Parents: 50fba9a
Author: Fabian Hueske 
Authored: Thu Nov 9 15:07:17 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:32:12 2017 +0100

--
 docs/dev/table/sourceSinks.md | 45 ++
 1 file changed, 45 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c697bc14/docs/dev/table/sourceSinks.md
--
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index dfa7954..0b4bdbe 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -496,6 +496,7 @@ The following table lists the `TableSink`s which are 
provided with Flink.
 | `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | 
Writes a Table to a Cassandra table. 
 | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 
0.8 sink with JSON encoding.
 | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 
0.9 sink with JSON encoding.
+| `Kafka010JsonTableSink` | `flink-connector-kafka-0.10` | N | Append | A 
Kafka 0.10 sink with JSON encoding.
 
 All sinks that come with the `flink-table` dependency can be directly used by 
your Table programs. For all other table sinks, you have to add the respective 
dependency in addition to the `flink-table` dependency.
 
@@ -503,6 +504,50 @@ A custom `TableSink` can be defined by implementing the 
`BatchTableSink`, `Appen
 
 {% top %}
 
+### KafkaJsonTableSink
+
+A `KafkaJsonTableSink` emits a [streaming append 
`Table`](./streaming.html#table-to-stream-conversion) to an Apache Kafka topic. 
The rows of the table are encoded as JSON records. Currently, only tables with 
flat schema, i.e., non-nested fields, are supported. 
+
+A `KafkaJsonTableSink` produces with at-least-once guarantees into a Kafka 
topic if the query is executed with [checkpointing enabled]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). 
+
+By default, a `KafkaJsonTableSink` writes to at most as many partitions as its 
own parallelism (each parallel instance of the sink writes to exactly one 
partition). In order to distribute the writes to more partitions or control the 
routing of rows into partitions, a custom `FlinkKafkaPartitioner` can be 
provided.
+
+The following example shows how to create a `KafkaJsonTableSink` for Kafka 
0.10. Sinks for Kafka 0.8 and 0.9 are instantiated analogously. 
+
+
+
+{% highlight java %}
+
+Table table = ...
+
+Properties props = new Properties();
+props.setProperty("bootstrap.servers", "localhost:9092");
+
+table.writeToSink(
+  new Kafka010JsonTableSink(
+"myTopic",// Kafka topic to write to
+props));  // Properties to configure the producer
+
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+
+val table: Table = ???
+
+val props = new Properties()
+props.setProperty("bootstrap.servers", "localhost:9092")
+
+table.writeToSink(
+  new Kafka010JsonTableSink(
+"myTopic",// Kafka topic to write to
+props))   // Properties to configure the producer
+  
+{% endhighlight %}
+
+
+
 ### CsvTableSink
 
 The `CsvTableSink` emits a `Table` to one or more CSV files. 



[1/5] flink git commit: [FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.

2017-11-16 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 101fef739 -> cd1fbc078


[FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.

This closes #4989.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a63d2be5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a63d2be5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a63d2be5

Branch: refs/heads/master
Commit: a63d2be5091d7d2f77161ec17e685ae546f2b0ec
Parents: 101fef7
Author: Shuyi Chen 
Authored: Thu Nov 9 00:05:20 2017 -0800
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:32:11 2017 +0100

--
 .../plan/schema/CompositeRelDataType.scala  |  4 +++-
 .../table/runtime/batch/sql/CalcITCase.scala| 19 ++
 .../table/runtime/batch/table/CalcITCase.scala  | 10 +++---
 .../table/runtime/stream/sql/SqlITCase.scala| 21 
 .../table/runtime/stream/table/CalcITCase.scala | 16 +++
 .../table/runtime/utils/StreamTestData.scala|  9 +
 6 files changed, 71 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a63d2be5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index e0c6b6f..f8c61fb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -38,7 +38,9 @@ class CompositeRelDataType(
 val compositeType: CompositeType[_],
 val nullable: Boolean,
 typeFactory: FlinkTypeFactory)
-  extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, 
typeFactory)) {
+  extends RelRecordType(
+StructKind.PEEK_FIELDS_NO_EXPAND,
+createFieldList(compositeType, typeFactory)) {
 
   override def toString = s"COMPOSITE($compositeType)"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a63d2be5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index b891a7d..7ca3e9c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -67,6 +67,25 @@ class CalcITCase(
   }
 
   @Test
+  def testSelectStarFromNestedTable(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT * FROM MyTable"
+
+val ds = 
CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv).as('a, 'b)
+tEnv.registerTable("MyTable", ds)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n"
+
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testSelectStarFromDataSet(): Unit = {
 
 val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/a63d2be5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index e947c3f..22373d2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -118,14 +118,10 @@ class CalcITCase(
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c).select('*)
+val t = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 
'a, 'b).select('*)
 
-val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-

[5/5] flink git commit: [FLINK-7389] [table] Remove Calcite PushProjector

2017-11-16 Thread fhueske
[FLINK-7389] [table] Remove Calcite PushProjector

This closes #5022.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc3eebd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc3eebd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc3eebd1

Branch: refs/heads/master
Commit: fc3eebd1e46c680a99ac22f20adda40841713294
Parents: a63d2be
Author: twalthr 
Authored: Thu Nov 16 10:20:16 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:32:12 2017 +0100

--
 .../apache/calcite/rel/rules/PushProjector.java | 868 ---
 .../table/runtime/batch/sql/CalcITCase.scala|   2 +-
 .../table/runtime/batch/sql/JoinITCase.scala|   4 +-
 3 files changed, 4 insertions(+), 870 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fc3eebd1/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
--
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
deleted file mode 100644
index 0955aeb..000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
+++ /dev/null
@@ -1,868 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.Strong;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.SemiJoin;
-import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.runtime.PredicateImpl;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.util.BitSets;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.Set;
-
-// This class is copied from Apache Calcite except that it does not
-// automatically name the field using the name of the operators
-// as the Table API rejects special characters like '-' in the field names.
-
-/**
- * PushProjector is a utility class used to perform operations used in push
- * projection rules.
- *
- * Pushing is particularly interesting in the case of join, because there
- * are multiple inputs. Generally an expression can be pushed down to a
- * particular input if it depends upon no other inputs. If it can be pushed
- * down to both sides, it is pushed down to the left.
- *
- * Sometimes an expression needs to be split before it can be pushed down.
- * To flag that an expression cannot be split, specify a rule that it must be
- * preserved. Such an expression will be pushed down intact to one
- * of the inputs, or not pushed down at all.
- */
-public class PushProjector {
-  //~ Instance fields 
-
-  private final Project origProj;
-  private final RexNode origFilter;
-  private final RelNode childRel;
-  private final ExprCondition preserveExprCondition;
-  private final RelBuilder relBuilder;
-
-  /**
-   * Original projection expressions
-   */
-  final List origProjExprs;
-
-  /**
-   * Fields from the RelNode that the projection is being pushed past
-   */
- 

[2/5] flink git commit: [FLINK-8014] [table] Add Kafka010JsonTableSink.

2017-11-16 Thread fhueske
[FLINK-8014] [table] Add Kafka010JsonTableSink.

- Refactor KafkaTableSink tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50fba9aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50fba9aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50fba9aa

Branch: refs/heads/master
Commit: 50fba9aa4e96632f7b32cf98d704683364196cbd
Parents: fc3eebd
Author: Fabian Hueske 
Authored: Tue Nov 7 17:59:43 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 16 11:32:12 2017 +0100

--
 .../connectors/kafka/Kafka010JsonTableSink.java | 73 
 .../kafka/Kafka010JsonTableSinkTest.java| 53 ++
 .../connectors/kafka/Kafka08JsonTableSink.java  | 26 ++-
 .../kafka/Kafka08JsonTableSinkTest.java | 27 +++-
 .../connectors/kafka/Kafka09JsonTableSink.java  | 26 ++-
 .../kafka/Kafka09JsonTableSinkTest.java | 27 +++-
 .../connectors/kafka/KafkaJsonTableSink.java|  5 +-
 .../connectors/kafka/KafkaTableSink.java| 10 ++-
 .../JsonRowSerializationSchema.java | 22 +-
 .../kafka/JsonRowSerializationSchemaTest.java   | 46 
 .../kafka/KafkaTableSinkTestBase.java   | 30 
 11 files changed, 269 insertions(+), 76 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
new file mode 100644
index 000..431ace0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka010JsonTableSink extends KafkaJsonTableSink {
+
+   /**
+* Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+* topic with fixed partition assignment.
+*
+* Each parallel TableSink instance will write its rows to a single 
Kafka partition.
+* 
+* If the number of Kafka partitions is less than the number of 
sink instances, different
+* sink instances will write to the same partition.
+* If the number of Kafka partitions is higher than the number of 
sink instance, some
+* Kafka partitions won't receive data.
+* 
+*
+* @param topic topic in Kafka to which table is written
+* @param properties properties to connect to Kafka
+*/
+   public Kafka010JsonTableSink(String topic, Properties properties) {
+   super(topic, properties, new FlinkFixedPartitioner<>());
+   }
+
+   /**
+* Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+* topic with custom partition assignment.
+*
+* @param topic topic in Kafka to which table is written
+* @param properties properties to connect to Kafka
+* @param partitioner Kafka partitioner
+*/
+   public Kafka010JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner partitioner) {
+   super(topic, properties, partitioner);
+   

flink git commit: [FLINK-7698] [table] Tests joins with null literals

2017-11-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.4 13962e1ff -> 397f0d150


[FLINK-7698] [table] Tests joins with null literals


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/397f0d15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/397f0d15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/397f0d15

Branch: refs/heads/release-1.4
Commit: 397f0d150255ea24fd5fd4cc38848441bb0fff58
Parents: 13962e1
Author: twalthr 
Authored: Thu Nov 16 11:00:30 2017 +0100
Committer: twalthr 
Committed: Thu Nov 16 11:06:14 2017 +0100

--
 .../flink/table/api/stream/sql/JoinTest.scala   | 45 
 1 file changed, 45 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/397f0d15/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 8c1865c..c14b698 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql
 
 import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.runtime.join.WindowJoinUtil
 import org.apache.flink.table.utils.TableTestUtil.{term, _}
@@ -245,6 +247,49 @@ class JoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinWithNullLiteral(): Unit = {
+val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)
+
+val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime, 12L as 'nullField)
+
+streamUtil.tableEnv.registerTable("T1", t1)
+streamUtil.tableEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 AS t1
+|JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|  t2.proctime + INTERVAL '5' SECOND
+|""".stripMargin
+
+val expected =
+  unaryNode("DataStreamCalc",
+binaryNode("DataStreamWindowJoin",
+  unaryNode("DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "proctime", "null AS nullField")
+  ),
+  unaryNode("DataStreamCalc",
+streamTableNode(1),
+term("select", "a", "c", "proctime", "12 AS nullField")
+  ),
+  term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, 
" +
+"-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 
5000)))"),
+  term("join", "a", "c", "proctime", "nullField", "a0", "c0", 
"proctime0", "nullField0"),
+  term("joinType", "InnerJoin")
+),
+term("select", "a0 AS a", "c0 AS c", "c AS c0")
+  )
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
 
 val sqlQuery =



flink git commit: [FLINK-7698] [table] Tests joins with null literals

2017-11-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master d0b2aa28d -> 101fef739


[FLINK-7698] [table] Tests joins with null literals


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/101fef73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/101fef73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/101fef73

Branch: refs/heads/master
Commit: 101fef7397128b0aba23221481ab86f62b18118f
Parents: d0b2aa2
Author: twalthr 
Authored: Thu Nov 16 11:00:30 2017 +0100
Committer: twalthr 
Committed: Thu Nov 16 11:00:30 2017 +0100

--
 .../flink/table/api/stream/sql/JoinTest.scala   | 45 
 1 file changed, 45 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/101fef73/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 8c1865c..c14b698 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql
 
 import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.runtime.join.WindowJoinUtil
 import org.apache.flink.table.utils.TableTestUtil.{term, _}
@@ -245,6 +247,49 @@ class JoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinWithNullLiteral(): Unit = {
+val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)
+
+val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime, 12L as 'nullField)
+
+streamUtil.tableEnv.registerTable("T1", t1)
+streamUtil.tableEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 AS t1
+|JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|  t2.proctime + INTERVAL '5' SECOND
+|""".stripMargin
+
+val expected =
+  unaryNode("DataStreamCalc",
+binaryNode("DataStreamWindowJoin",
+  unaryNode("DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "proctime", "null AS nullField")
+  ),
+  unaryNode("DataStreamCalc",
+streamTableNode(1),
+term("select", "a", "c", "proctime", "12 AS nullField")
+  ),
+  term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, 
" +
+"-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 
5000)))"),
+  term("join", "a", "c", "proctime", "nullField", "a0", "c0", 
"proctime0", "nullField0"),
+  term("joinType", "InnerJoin")
+),
+term("select", "a0 AS a", "c0 AS c", "c AS c0")
+  )
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
 
 val sqlQuery =