[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123487#comment-16123487
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3712


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123225#comment-16123225
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks for the update @haohui.

I'll merge this PR :-)
I have to change some of the type mappings to make it work with 
`JDBCOutputFormat`. I think it would be a good idea to redesign the type 
handling in `JDBCOutputFormat`. I'll open a JIRA for that.

Thanks, Fabian


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120659#comment-16120659
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks @fhueske ! Updated the PR to address the comments.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119809#comment-16119809
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132168767
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 ---
 
-**TODO**
+### JDBCAppendSink
+
+JDBCAppendSink allows you to bridge the data stream to the 
JDBC driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using REPLACE or INSERT OVERWRITE to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(flink-jdbc) to your project. Then you can create the sink using 
JDBCAppendSinkBuilder:
+
+
+
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(new TypeInformation[] {INT_TYPE_INFO})
--- End diff --

change to `setParameterTypes()` if we rename the method.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119811#comment-16119811
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132141735
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once 
to this sink.
+ * However, one common use case is to run idempotent queries (e.g., 
REPLACE or
+ * INSERT OVERWRITE) to upsert into the database and achieve 
exactly-once semantic.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   int[] types = sink.outputFormat.getTypesArray();
+   Preconditions.checkArgument(fieldTypes.length == types.length);
--- End diff --

Give a detailed error message like: `"Schema of output table incompatible 
with JDBCAppendTableSink: expected [type1, type2, type3, ...], actual [type1, 
type2]"`


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119805#comment-16119805
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132146108
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchSize = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
--- End diff --

Add JavaDocs to the public configuration methods.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119803#comment-16119803
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132142854
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once 
to this sink.
+ * However, one common use case is to run idempotent queries (e.g., 
REPLACE or
+ * INSERT OVERWRITE) to upsert into the database and achieve 
exactly-once semantic.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   int[] types = sink.outputFormat.getTypesArray();
+   Preconditions.checkArgument(fieldTypes.length == types.length);
+   for (int i = 0; i < types.length; ++i) {
+   
Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i])
 == types[i],
+   "Incompatible types between fields and JDBC 
format at " + i);
+   }
+
+   JDBCAppendTableSink copy = new 
JDBCAppendTableSink(sink.outputFormat);
--- End diff --

Passing the reference should be fine, but to be sure we could create a deep 
copy via `SerializationUtils.clone()`


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119806#comment-16119806
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132160738
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 ---
 
-**TODO**
+### JDBCAppendSink
+
+JDBCAppendSink allows you to bridge the data stream to the 
JDBC driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using REPLACE or INSERT OVERWRITE to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(flink-jdbc) to your project. Then you can create the sink using 
JDBCAppendSinkBuilder:
+
+
+
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(new TypeInformation[] {INT_TYPE_INFO})
+  .build();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(Array(INT_TYPE_INFO))
--- End diff --

use varargs?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119812#comment-16119812
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132166866
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.CHAR_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DATE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+class JDBCTypeUtil {
+   private static final Map BASIC_TYPES;
+
+   static {
+   HashMap m = new HashMap<>();
+   m.put(STRING_TYPE_INFO, Types.VARCHAR);
+   m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
+   m.put(BYTE_TYPE_INFO, Types.TINYINT);
+   m.put(SHORT_TYPE_INFO, Types.SMALLINT);
+   m.put(INT_TYPE_INFO, Types.INTEGER);
+   m.put(LONG_TYPE_INFO, Types.BIGINT);
+   m.put(FLOAT_TYPE_INFO, Types.FLOAT);
+   m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
+   m.put(CHAR_TYPE_INFO, Types.SMALLINT);
--- End diff --

`JDBCOutputFormat` will insert a `SMALLINT` by casting to `short`. This 
cast will fail for `Character`. 

Please double check the type assignment and align it with 
`JDBCOutputFormat.writeRecord()`.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119810#comment-16119810
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132146531
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchSize = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
+   this.batchSize = batchSize;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation... 
fieldTypes) {
+   this.fieldTypes = fieldTypes;
+   return this;
+   }
+
+   /**
+* Finalizes the configuration and checks validity.
+*
+* @return Configured JDBCOutputFormat
+*/
+   public JDBCAppendTableSink build() {
+   Preconditions.checkNotNull(fieldTypes, "Row type is 
unspecified");
--- End diff --

change error message to `"Types of the query parameters are not specified. 
Please specify types using the setFieldTypes() method."` (or 
`setParameterTypes()` if we rename the method).`


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119808#comment-16119808
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132157359
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchSize = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
+   this.batchSize = batchSize;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation... 
fieldTypes) {
--- End diff --

Should we rename the method to `setParameterTypes()` and offer an 
overloaded version `setParameterTypes(int... paramTypes)` that allows to 
specify types as `java.sql.Types`?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119804#comment-16119804
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132141831
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once 
to this sink.
+ * However, one common use case is to run idempotent queries (e.g., 
REPLACE or
+ * INSERT OVERWRITE) to upsert into the database and achieve 
exactly-once semantic.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   int[] types = sink.outputFormat.getTypesArray();
+   Preconditions.checkArgument(fieldTypes.length == types.length);
+   for (int i = 0; i < types.length; ++i) {
+   
Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i])
 == types[i],
--- End diff --

add more details to error message like: `"Schema of output table 
incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...], 
actual [type1, type2]"`


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119813#comment-16119813
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132162563
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+
+class JDBCSinkFunction extends RichSinkFunction implements 
CheckpointedFunction {
+   final JDBCOutputFormat outputFormat;
+
+   JDBCSinkFunction(JDBCOutputFormat outputFormat) {
+   this.outputFormat = outputFormat;
+   }
+
+   @Override
+   public void invoke(Row value) throws Exception {
+   outputFormat.writeRecord(value);
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   outputFormat.flush();
+   }
+
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   RuntimeContext ctx = getRuntimeContext();
+   outputFormat.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
--- End diff --

add a call `outputFormat.setRuntimeContext(ctx);` before calling `open()`.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119807#comment-16119807
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132160714
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 ---
 
-**TODO**
+### JDBCAppendSink
+
+JDBCAppendSink allows you to bridge the data stream to the 
JDBC driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using REPLACE or INSERT OVERWRITE to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(flink-jdbc) to your project. Then you can create the sink using 
JDBCAppendSinkBuilder:
+
+
+
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(new TypeInformation[] {INT_TYPE_INFO})
--- End diff --

use varargs?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118969#comment-16118969
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132022891
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   JDBCAppendTableSink copy = new 
JDBCAppendTableSink(sink.outputFormat);
--- End diff --

The `JDBCOutputFormat` now is only constructed via 
`JDBCAppendableSinkBuilder`, thus the types should always match, but it is a 
good idea to add the checks to catch potential bugs.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093734#comment-16093734
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128218692
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
--- End diff --

Add a comment that `exactly-once` can be achieved by idempotent insert 
operations?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093737#comment-16093737
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128219296
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   JDBCAppendTableSink copy = new 
JDBCAppendTableSink(sink.outputFormat);
--- End diff --

We could validate that the types of the `JDBCOutputFormat` match the 
`fieldTypes` which are provided by the optimizer. 
Or do you have concerns regarding such a check?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093739#comment-16093739
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128220251
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -218,10 +225,7 @@ public void writeRecord(Row row) throws IOException {
@Override
public void close() throws IOException {
try {
-   if (upload != null) {
-   upload.executeBatch();
-   upload.close();
--- End diff --

we should `close()` the `PreparedStatement`


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093736#comment-16093736
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128219589
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchInterval = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
--- End diff --

IMO, interval has a temporal connotation. `batchInterval` -> `batchSize`?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093738#comment-16093738
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128226313
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+   private static final RowTypeInfo ROW_TYPE = new 
RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
+
+   @Test
+   public void testAppendTableSink() throws IOException {
+   JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+   .setDrivername("foo")
+   .setDBUrl("bar")
+   .setQuery("insert into %s (id) values (?)")
+   .setFieldTypes(FIELD_TYPES)
+   .build();
+
+   StreamExecutionEnvironment env =
+   mock(StreamExecutionEnvironment.class);
+   doAnswer(new Answer() {
+   @Override
+   public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+   return invocationOnMock.getArguments()[0];
+   }
+   }).when(env).clean(any());
+
+   TypeSerializer ts = 
ROW_TYPE.createSerializer(mock(ExecutionConfig.class));
+   FromElementsFunction func = new FromElementsFunction<>(ts, 
Row.of("foo"));
+   DataStream ds = new DataStreamSource<>(env, ROW_TYPE, new 
StreamSource<>(func), false, "foo");
+   DataStreamSink dsSink = ds.addSink(sink.getSink());
--- End diff --

I think we should test for the correctness of the `emitDataStream()` method.
Could be done as follows:

```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream ds = 
env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
sink.emitDataStream(ds);

Collection sinkIds = env.getStreamGraph().getSinkIDs();
assertEquals(1, sinkIds.size());
int sinkId = sinkIds.iterator().next();

StreamSink planSink = 
(StreamSink)env.getStreamGraph().getStreamNode(sinkId).getOperator();
assertSame(sink.getSink(), planSink.getUserFunction());
```


> Create TableSink for JDBC

[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093735#comment-16093735
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128221151
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchInterval = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
+   this.batchInterval = batchInterval;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation[] 
fieldTypes) {
--- End diff --

Make `fieldTypes` a vararg for convenience? 

I would think that `java.sql.Types` would be more natural in the context of 
a JDBC sink but I'm open for `TypeInformation` as well.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16082522#comment-16082522
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3712
  
It would also be good to add the `JdbcTableSink` to the documentation 
(incl. an example) once the API is fixed.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16082467#comment-16082467
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r126724241
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
+   implements AppendStreamTableSink, CheckpointedFunction {
+   private final JDBCOutputFormat outputFormat;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(JDBCOutputFormat outputFormat) {
+   this.outputFormat = outputFormat;
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(this);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   JDBCTableSink copy = new JDBCTableSink(outputFormat);
+   copy.fieldNames = fieldNames;
+   copy.fieldTypes = fieldTypes;
--- End diff --

We could validate that the types provided by the Table API are compatible 
with the types that the `JdbcOutputFormat` expects to avoid exceptions during 
execution.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16082469#comment-16082469
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r126736740
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for JDBCTableSink.
+ */
+public class JDBCTableSinkTest extends JDBCTestBase {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+
+
+   private JDBCOutputFormat jdbcOutputFormat;
+
+   @After
+   public void tearDown() throws IOException {
+   if (jdbcOutputFormat != null) {
+   jdbcOutputFormat.close();
+   }
+   jdbcOutputFormat = null;
+   }
+
+   @Test
+   public void testFlush() throws Exception {
--- End diff --

This tests rather the `JdbcOutputFormat.flush()` method. I would move (and 
adapt) this test to the `JdbcOutputFormatTest`.

For the `JdbcTableSink` we need tests that check the configuration (which 
is done below) and a check that the `emitDataStream()` method (and the returned 
`SinkFunction`) is working correctly. I would do this by calling 
`emitDataStream()` with a mocked `DataStream` and fetching the 
`SinkFunction` from the returned `DataStreamSink` 
(`sink.getTransformation().getOperator().getUserFunction()`).


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16082468#comment-16082468
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r126718250
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
--- End diff --

I would not extend `RichSinkFunction`. Although this might work in 
practice, I think this mixes the logical representation of a table (in the 
catalog and during optimization) with the actual runtime code. I'd rather 
implement a separate JdbcSinkFunction (within this file) and instantiate it in 
`emitDataStream()`.

I also think that we should implement the `BatchTableSink` interface which 
would directly use the `JdbcOutputFormat`.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055876#comment-16055876
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3712
  
Yes, I think that would work well. We could provide a builder for the table 
source and forward calls to the `JDBCOutputFormatBuilder`. 

Regarding the query template: we can also make it an optional parameter to 
override the standard `INSERT INTO ... VALUES` template if set. 

If you want the source to support upsert writes you might want to implement 
the `UpsertStreamTableSink` rather than the `AppendStreamTableSink`. The 
`UpsertStreamTableSink` supports append writes as well as updates and deletes 
by exposing the unique key fields of a `Table`. 


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055793#comment-16055793
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
It is important to have some flexibility on the query as different SQL 
engines have slightly different syntax on DML.

For example, SQLite supports INSERT OVERWRITE where MySQL supports REPLACE 
INTO to upsert records with unique keys.

I like the APIs you proposed, do you think it addresses your concerns if it 
forwards the parameters to JDbCOutputFormat and construct it internally?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055738#comment-16055738
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r122970495
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
+   implements AppendStreamTableSink, CheckpointedFunction {
+   private final JDBCOutputFormat outputFormat;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(JDBCOutputFormat outputFormat) {
--- End diff --

What do you think about not exposing the `JDBCOutputFormat` to the user, 
but to configure it internally.

Of course we would need many of the configuration parameters (user, pw, 
driver, dburl, and table name). Users could either specify field names of the 
table to write to (fields are mapped by position) or not (we use the field 
names of the `Table` to emit). For this information we can construct a 
parameterized insert query: `INSERT INTO $table ($f1, $f2, $f3) VALUES (?, ?, 
?)`.  The field types are automatically provided by the `configure()` call.

This would be a tighter integration with the Table API (using provided 
field types and possibly field names).
Does this work for your use case or do you need the flexibility of 
specifying your own query?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055740#comment-16055740
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r122972333
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
--- End diff --

I think it could also be easily extended to support batch output by 
implementing the `BatchTableSink` interface and implementing 
`emitDataSet(dataSet: DataSet[Row])` as
```
def emitDataSet(dataSet: DataSet[Row]): Unit = {
  dataSet.output(jdbcOutputFormat)
}
```



> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055739#comment-16055739
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r122971462
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
--- End diff --

Rename to `JDBCAppendTableSink`?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054041#comment-16054041
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks @zentol! 
I'll have a look at it as well.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16053873#comment-16053873
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3712
  
+1.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16041825#comment-16041825
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120765156
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
upload.addBatch();
batchCount++;
if (batchCount >= batchInterval) {
-   upload.executeBatch();
-   batchCount = 0;
+   flush();
}
} catch (SQLException | IllegalArgumentException e) {
throw new IllegalArgumentException("writeRecord() 
failed", e);
}
}
 
+   void flush() throws SQLException {
+   if (upload != null) {
+   upload.executeBatch();
--- End diff --

It is a synchronous call. It will throw `SQLException` and abort the sink. 
The behavior has not been changed.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16040576#comment-16040576
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120576032
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for JDBCTableSink.
+ */
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+
+   @Test
+   public void testOutputSink() throws Exception {
+   JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
+   JDBCTableSink sink = new JDBCTableSink(outputFormat);
+   @SuppressWarnings("unchecked")
+   DataStream dataStream = (DataStream) 
mock(DataStream.class);
+   sink.emitDataStream(dataStream);
+   verify(dataStream).addSink(sink);
--- End diff --

you don't have to test this, as it is not a detail of the JDBCTableSink but 
the table API.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16040578#comment-16040578
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120575757
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for JDBCTableSink.
+ */
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+
+   @Test
+   public void testOutputSink() throws Exception {
+   JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
+   JDBCTableSink sink = new JDBCTableSink(outputFormat);
+   @SuppressWarnings("unchecked")
+   DataStream dataStream = (DataStream) 
mock(DataStream.class);
+   sink.emitDataStream(dataStream);
+   verify(dataStream).addSink(sink);
+   }
+
+   @Test
+   public void testFlush() throws Exception {
+   JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
+   JDBCTableSink sink = new JDBCTableSink(outputFormat);
+   @SuppressWarnings("unchecked")
+   DataStream dataStream = (DataStream) 
mock(DataStream.class);
+   sink.emitDataStream(dataStream);
+   sink.snapshotState(mock(FunctionSnapshotContext.class));
+   verify(dataStream).addSink(sink);
+   verify(outputFormat).flush();
--- End diff --

let's not use mocking for this test. Just create an actual format/sink, 
give N values to the sink where N < batchSize, verify they haven't been written 
yet, call flush, verify they were written.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16040577#comment-16040577
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120576118
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
upload.addBatch();
batchCount++;
if (batchCount >= batchInterval) {
-   upload.executeBatch();
-   batchCount = 0;
+   flush();
}
} catch (SQLException | IllegalArgumentException e) {
throw new IllegalArgumentException("writeRecord() 
failed", e);
}
}
 
+   void flush() throws SQLException {
+   if (upload != null) {
+   upload.executeBatch();
--- End diff --

It's been a while since i worked with JDBC, I take it this is a synchronous 
call? What happens if this call fails?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-06-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16037544#comment-16037544
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
@fhueske @zentol can you please take another look? Thanks.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15987408#comment-15987408
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
Correct me if I'm wrong -- will something like the following work?

```
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   outputFormat.flush();
+   }
```


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986917#comment-15986917
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3712
  
Great, just wanted to make sure we're on the same page.

Why did you revert the usage of the GenericWriteAheadSink? Now we're back 
to where we started, not having any guarantee that data is written when a 
checkpoint is being completed.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986872#comment-15986872
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3712
  
I think an at-least-once sink with support for upserts would also be very 
useful.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985498#comment-15985498
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks for your pointer of the prototype!

> Do you intend to provide exactly-once guarantees for arbitrary updates?

As I think about it a little bit more, I think it might make sense to start 
with the at-least-once semantic first. In practice we make the JDBC call 
idempotent using `INSERT IF NOT EXISTS`.

The exactly-once part is more tricky and let's separate it out for now. 
What do you think?



> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984520#comment-15984520
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412006
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
+   super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
+   this.outputFormat = outputFormat;
+   this.committer = committer;
+   this.fieldNames = fieldNames;
+   this.fieldTypes = fieldTypes;
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.transform("JDBC Sink", getOutputType(), this);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   try {
+   return new JDBCTableSink(committer, serializer, 
outputFormat, fieldNames, fieldTypes);
+   } catch (Exception e) {
+   LOG.warn("Failed to create a copy of the sink.", e);
+   return null;
+   }
+   }
+
+   @Override
+   protected boolean sendValues(Iterable value, long timestamp) 
throws Exception {
+   for (Row r : value) {
+   try {
+   outputFormat.writeRecord(r);
--- End diff --

This doesn't guarantee in any way  that the values are actually being sent; 
you need some kind of flushing functionality for this to work properly.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984519#comment-15984519
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410894
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
--- End diff --

like the cassandra sink the `fieldNames/Types` should be removed to provide 
a clean API to the user.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984517#comment-15984517
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410993
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
--- End diff --

remove space after `[]`.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984518#comment-15984518
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412806
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
--- End diff --

I would propose either adding a JDBCCheckpointCommitter that cooperates 
with the sink (as seen in this 
[prototype](https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900)
 or omitting the `CheckpointCommitter` argument and providing a dummy to the 
`GenericWriteAheadSink`.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984521#comment-15984521
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113411053
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {
--- End diff --

remove space after `[]`. move `STRING_TYPE_INFO` to this line.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981263#comment-15981263
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3712
  
The GenericWriteAheadSink would work for this. It can be implemented just 
the `CassandraWriteAheadSink`. Since we can also use transactions it can be a 
bit more sophisticated even.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981228#comment-15981228
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3712
  
There is `GenericWriteAheadSink`, which buffers elements and writes to 
Cassandra. I think this needs some more thought, maybe a design outline on the 
Jira issue. I'm also not 100 % sure if the generic write-ahead sink will work 
for this. @zentol might have a better answer, though.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979665#comment-15979665
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks for the review. I'm not aware of the fact that OutputFormats are not 
integrated with Flink's checkpointing mechanism.

To address this problem, maybe we can do something similar to the 
`FlinkKafkaProducerBase`? What do you think @zentol and @aljoscha ?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978358#comment-15978358
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3712
  
Hi @haohui, I think a JdbcTableSink would be a great feature! 

However, there is a big issue with wrapping the `JdbcOutputFormat`. 
OutputFormats are not integrated with Flink's checkpointing mechanism. The 
`JdbcOutputFormat` buffers rows to write them out in batches. When records are 
buffered that arrived before the last checkpoint, they will be lost in case of 
a failure because they will not be replayed.

The JdbcTableSink should be integrated with Flink's checkpointing 
mechanism. In a nutshell, it should buffer records and commit them to the 
database when a checkpoint is taken. I think we need to think a bit more about 
a proper design for this feature. @zentol and @aljoscha might have some 
thoughts on this as well as they are more familiar with the implementation of 
checkpoint-aware sinks.

What do you think?


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965470#comment-15965470
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3712

[FLINK-6281] Create TableSink for JDBC.

This PR implements the `StreamTableSink` interface for the JDBC connectors 
so that the streaming SQL APIs can directly interact with them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-6281

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3712.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3712


commit 2bfd014bedc5bd64f346652dfb5ddb41cc36cc3f
Author: Haohui Mai 
Date:   2017-04-12T06:56:56Z

[FLINK-6281] Create TableSink for JDBC.




> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)