[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428247#comment-16428247
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5758


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428168#comment-16428168
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179715295
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map sources;
+   private Map tables;
--- End diff --

Also true. I will merge this now to have it in the next release. We can 
still refactor if necessary as it is internal API.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428081#comment-16428081
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179686471
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
--- End diff --

Thanks a lot, Timo. We can consider refactoring the code later. How about 
simply moving statisticsDescriptor to SourceTableDescriptor for now? 


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428080#comment-16428080
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179686465
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map sources;
+   private Map tables;
--- End diff --

That is another option. But the purpose is if a table is both source and 
sink, we dont need to duplicate the config in both the sources and sinks 
section, as it might be error prone that you modify the table config in 
sources, but forget to modify the config of the same table in sinks section, 
thus causing inconsistency. What do you think?


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425717#comment-16425717
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179182994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
--- End diff --

Thanks for pointing this out. You are right. Sinks can have a schema but no 
statistics. I was just wondering if we really need most of the refactorings in 
this PR. We need to rework the `TableSourceDescriptor` class in the near future 
because a Java user can access all `protected` field which is not very nice API 
design. 


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425676#comment-16425676
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179178156
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
--- End diff --

I was wondering if a sink could have its own "schema configurations" for 
alignment with the output table schema? 
For example a CassandraTableSink / JDBCTableSink would definitely throw 
exceptions when trying to execute an insert with mismatched schemas.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425558#comment-16425558
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179149794
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map sources;
+   private Map tables;
--- End diff --

Why not maintaining two separate maps for sources and sinks? Then we don't 
need instance of checks. If a table is both we can simply add it to both maps.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425559#comment-16425559
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179150731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
--- End diff --

I'm wondering if we really need these changes. A sink will never have a 
schema or statistics.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423629#comment-16423629
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/5758


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423631#comment-16423631
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/5758

[FLINK-9059][Table API & SQL] add table type attribute; replace "sources" 
with "tables" in environm…

## What is the purpose of the change

Add support for unified table source and sink declaration in environment 
file definition.
This change prepares for FLINK-9049 (Create unified interfaces to configure 
and instatiate TableSink) We want to get this change in before 1.5 so it wont 
break the API in next flink release.


## Brief change log

  - Refactor sql client environment file definition to replace "sources" 
with "tables" 
  - Add "type" property to distinguish between table source and sink.

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): (no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-9059

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5758


commit 7886287edd464c4a9fc84a47be214ff39b8e5ad0
Author: Shuyi Chen 
Date:   2018-03-23T06:00:00Z

add table type attribute; replace "sources" with "tables" in environment 
file

commit 63a3faa115254564cb5fb8e85f0e0e50e33062fe
Author: Shuyi Chen 
Date:   2018-03-30T17:12:10Z

fix comments




> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420750#comment-16420750
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178334057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.foreach(_.addProperties(properties))
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Reads table statistics from the descriptors properties.
+*/
+  protected def getTableStats: Option[TableStats] = {
+val normalizedProps = new DescriptorProperties()
+addProperties(normalizedProps)
+val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+rowCount match {
+  case Some(cnt) =>
+val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
+Some(TableStats(cnt, columnStats.asJava))
+  case None =>
+None
+}
+  }
+}
+
+object TableDescriptor {
+  /**
+* Key for describing the type of this table, valid values are 
('source').
+*/
+  val TABLE_TYPE = "type"
+
+  /**
+* Valid TABLE_TYPE value.
+*/
+  val TABLE_TYPE_SOURCE = "source"
--- End diff --

make sense  


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420749#comment-16420749
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178334028
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map sources;
+   private Map tables;
 
private Execution execution;
 
private Deployment deployment;
 
public Environment() {
-   this.sources = Collections.emptyMap();
+   this.tables = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
}
 
-   public Map getSources() {
-   return sources;
+   public Map getTables() {
+   return tables;
}
 
-   public void setSources(List> sources) {
-   this.sources = new HashMap<>(sources.size());
-   sources.forEach(config -> {
-   final Source s = Source.create(config);
-   if (this.sources.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate source 
name '" + s + "'.");
+   public void setTables(List> tables) {
+   this.tables = new HashMap<>(tables.size());
+   tables.forEach(config -> {
+   if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
+   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
--- End diff --

Got it, so `both` should probably be added here once `sink` type is 
supported in FLINK-8866. Thanks for the clarification.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420715#comment-16420715
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178330131
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.foreach(_.addProperties(properties))
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Reads table statistics from the descriptors properties.
+*/
+  protected def getTableStats: Option[TableStats] = {
+val normalizedProps = new DescriptorProperties()
+addProperties(normalizedProps)
+val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+rowCount match {
+  case Some(cnt) =>
+val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
+Some(TableStats(cnt, columnStats.asJava))
+  case None =>
+None
+}
+  }
+}
+
+object TableDescriptor {
+  /**
+* Key for describing the type of this table, valid values are 
('source').
+*/
+  val TABLE_TYPE = "type"
+
+  /**
+* Valid TABLE_TYPE value.
+*/
+  val TABLE_TYPE_SOURCE = "source"
--- End diff --

The convention in the code currently uses just constants, please see 
KafkaValidator or RowtimeValidator. Also, it seems to be premature optimization 
to me given such simple use of the constant.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420249#comment-16420249
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178244750
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -102,10 +112,10 @@ public static Environment parse(String content) 
throws IOException {
public static Environment merge(Environment env1, Environment env2) {
final Environment mergedEnv = new Environment();
 
-   // merge sources
-   final Map sources = new 
HashMap<>(env1.getSources());
-   mergedEnv.getSources().putAll(env2.getSources());
-   mergedEnv.sources = sources;
+   // merge tables
+   final Map sources = new 
HashMap<>(env1.getTables());
--- End diff --

Good catch on the naming.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420247#comment-16420247
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178244473
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map sources;
+   private Map tables;
 
private Execution execution;
 
private Deployment deployment;
 
public Environment() {
-   this.sources = Collections.emptyMap();
+   this.tables = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
}
 
-   public Map getSources() {
-   return sources;
+   public Map getTables() {
+   return tables;
}
 
-   public void setSources(List> sources) {
-   this.sources = new HashMap<>(sources.size());
-   sources.forEach(config -> {
-   final Source s = Source.create(config);
-   if (this.sources.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate source 
name '" + s + "'.");
+   public void setTables(List> tables) {
+   this.tables = new HashMap<>(tables.size());
+   tables.forEach(config -> {
+   if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
+   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
--- End diff --

Yes, the values can be (source, sink and both), please see 
https://issues.apache.org/jira/browse/FLINK-8866.


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417836#comment-16417836
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r177830071
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -102,10 +112,10 @@ public static Environment parse(String content) 
throws IOException {
public static Environment merge(Environment env1, Environment env2) {
final Environment mergedEnv = new Environment();
 
-   // merge sources
-   final Map sources = new 
HashMap<>(env1.getSources());
-   mergedEnv.getSources().putAll(env2.getSources());
-   mergedEnv.sources = sources;
+   // merge tables
+   final Map sources = new 
HashMap<>(env1.getTables());
--- End diff --

`final Map tables`


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417835#comment-16417835
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r177836030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.foreach(_.addProperties(properties))
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Reads table statistics from the descriptors properties.
+*/
+  protected def getTableStats: Option[TableStats] = {
+val normalizedProps = new DescriptorProperties()
+addProperties(normalizedProps)
+val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+rowCount match {
+  case Some(cnt) =>
+val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
+Some(TableStats(cnt, columnStats.asJava))
+  case None =>
+None
+}
+  }
+}
+
+object TableDescriptor {
+  /**
+* Key for describing the type of this table, valid values are 
('source').
+*/
+  val TABLE_TYPE = "type"
+
+  /**
+* Valid TABLE_TYPE value.
+*/
+  val TABLE_TYPE_SOURCE = "source"
--- End diff --

Maybe we can use either a ENUM or hierarchy to represent "valid" types? 
Does this seems like a good idea, such as: `TABLE_TYPE.SOURCE_TYPE`


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417834#comment-16417834
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r177828794
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map sources;
+   private Map tables;
 
private Execution execution;
 
private Deployment deployment;
 
public Environment() {
-   this.sources = Collections.emptyMap();
+   this.tables = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
}
 
-   public Map getSources() {
-   return sources;
+   public Map getTables() {
+   return tables;
}
 
-   public void setSources(List> sources) {
-   this.sources = new HashMap<>(sources.size());
-   sources.forEach(config -> {
-   final Source s = Source.create(config);
-   if (this.sources.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate source 
name '" + s + "'.");
+   public void setTables(List> tables) {
+   this.tables = new HashMap<>(tables.size());
+   tables.forEach(config -> {
+   if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
+   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
--- End diff --

Just curious, any chance we need to have a Table to be both `source` and 
`sink` type. (e.g. similar to queryable state, if user wants to maintain 
explicitly a table for that purpose)


> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-26 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414618#comment-16414618
 ] 

Shuyi Chen commented on FLINK-9059:
---

Hi [~twalthr], thanks a lot for the comments. In the Pull Request, it is 
actually already using "type" instead of "tableType". Could you please help 
take a look at the PR? Thanks a lot.

> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "type" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-26 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413686#comment-16413686
 ] 

Timo Walther commented on FLINK-9059:
-

Thanks for working on this [~suez1224]. Just a minor thing that I mentioned in 
some other document to be consistent for the "tableType" property:

_Please try to reuse key names as much as possible. Key-names should be 
hierarchical and lower case. Use "-" instead of dots or camel case. E.g., 
connector.schema.start-from = from-earliest. Try not to use the higher level in 
a key-name. E.g., instead of connector.kafka.kafka-version use 
connector.kafka.version._

So maybe we should name it either just {{type}} or {{table-type}}.

> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "tableType" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412021#comment-16412021
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5758

[FLINK-9059][Table API & SQL] add table type attribute; replace "sources" 
with "tables" in environm…

## What is the purpose of the change

Add support for unified table source and sink declaration in environment 
file definition.
This change prepares for FLINK-9049 (Create unified interfaces to configure 
and instatiate TableSink) We want to get this change in before 1.5 so it wont 
break the API in next flink release.


## Brief change log

  - Refactor sql client environment file definition to replace "sources" 
with "tables" 
  - Add "type" property to distinguish between table source and sink.

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): (no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-9059

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5758


commit 6da60ae23d83783137d44e282e06e16c947f0eb7
Author: Shuyi Chen 
Date:   2018-03-23T06:00:00Z

add table type attribute; replace "sources" with "tables" in environment 
file




> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "tableType" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)