[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3712


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132141735
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once 
to this sink.
+ * However, one common use case is to run idempotent queries (e.g., 
REPLACE or
+ * INSERT OVERWRITE) to upsert into the database and achieve 
exactly-once semantic.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   int[] types = sink.outputFormat.getTypesArray();
+   Preconditions.checkArgument(fieldTypes.length == types.length);
--- End diff --

Give a detailed error message like: `"Schema of output table incompatible 
with JDBCAppendTableSink: expected [type1, type2, type3, ...], actual [type1, 
type2]"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132157359
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchSize = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
+   this.batchSize = batchSize;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation... 
fieldTypes) {
--- End diff --

Should we rename the method to `setParameterTypes()` and offer an 
overloaded version `setParameterTypes(int... paramTypes)` that allows to 
specify types as `java.sql.Types`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132141831
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once 
to this sink.
+ * However, one common use case is to run idempotent queries (e.g., 
REPLACE or
+ * INSERT OVERWRITE) to upsert into the database and achieve 
exactly-once semantic.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   int[] types = sink.outputFormat.getTypesArray();
+   Preconditions.checkArgument(fieldTypes.length == types.length);
+   for (int i = 0; i < types.length; ++i) {
+   
Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i])
 == types[i],
--- End diff --

add more details to error message like: `"Schema of output table 
incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...], 
actual [type1, type2]"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132168767
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 ---
 
-**TODO**
+### JDBCAppendSink
+
+JDBCAppendSink allows you to bridge the data stream to the 
JDBC driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using REPLACE or INSERT OVERWRITE to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(flink-jdbc) to your project. Then you can create the sink using 
JDBCAppendSinkBuilder:
+
+
+
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(new TypeInformation[] {INT_TYPE_INFO})
--- End diff --

change to `setParameterTypes()` if we rename the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132160714
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 ---
 
-**TODO**
+### JDBCAppendSink
+
+JDBCAppendSink allows you to bridge the data stream to the 
JDBC driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using REPLACE or INSERT OVERWRITE to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(flink-jdbc) to your project. Then you can create the sink using 
JDBCAppendSinkBuilder:
+
+
+
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(new TypeInformation[] {INT_TYPE_INFO})
--- End diff --

use varargs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132162563
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+
+class JDBCSinkFunction extends RichSinkFunction implements 
CheckpointedFunction {
+   final JDBCOutputFormat outputFormat;
+
+   JDBCSinkFunction(JDBCOutputFormat outputFormat) {
+   this.outputFormat = outputFormat;
+   }
+
+   @Override
+   public void invoke(Row value) throws Exception {
+   outputFormat.writeRecord(value);
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   outputFormat.flush();
+   }
+
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   RuntimeContext ctx = getRuntimeContext();
+   outputFormat.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
--- End diff --

add a call `outputFormat.setRuntimeContext(ctx);` before calling `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132146108
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchSize = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
--- End diff --

Add JavaDocs to the public configuration methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132146531
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchSize = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
+   this.batchSize = batchSize;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation... 
fieldTypes) {
+   this.fieldTypes = fieldTypes;
+   return this;
+   }
+
+   /**
+* Finalizes the configuration and checks validity.
+*
+* @return Configured JDBCOutputFormat
+*/
+   public JDBCAppendTableSink build() {
+   Preconditions.checkNotNull(fieldTypes, "Row type is 
unspecified");
--- End diff --

change error message to `"Types of the query parameters are not specified. 
Please specify types using the setFieldTypes() method."` (or 
`setParameterTypes()` if we rename the method).`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132160738
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 ---
 
-**TODO**
+### JDBCAppendSink
+
+JDBCAppendSink allows you to bridge the data stream to the 
JDBC driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using REPLACE or INSERT OVERWRITE to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(flink-jdbc) to your project. Then you can create the sink using 
JDBCAppendSinkBuilder:
+
+
+
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(new TypeInformation[] {INT_TYPE_INFO})
+  .build();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setFieldTypes(Array(INT_TYPE_INFO))
--- End diff --

use varargs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132142854
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once 
to this sink.
+ * However, one common use case is to run idempotent queries (e.g., 
REPLACE or
+ * INSERT OVERWRITE) to upsert into the database and achieve 
exactly-once semantic.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   int[] types = sink.outputFormat.getTypesArray();
+   Preconditions.checkArgument(fieldTypes.length == types.length);
+   for (int i = 0; i < types.length; ++i) {
+   
Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i])
 == types[i],
+   "Incompatible types between fields and JDBC 
format at " + i);
+   }
+
+   JDBCAppendTableSink copy = new 
JDBCAppendTableSink(sink.outputFormat);
--- End diff --

Passing the reference should be fine, but to be sure we could create a deep 
copy via `SerializationUtils.clone()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132166866
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.CHAR_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DATE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+class JDBCTypeUtil {
+   private static final Map BASIC_TYPES;
+
+   static {
+   HashMap m = new HashMap<>();
+   m.put(STRING_TYPE_INFO, Types.VARCHAR);
+   m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
+   m.put(BYTE_TYPE_INFO, Types.TINYINT);
+   m.put(SHORT_TYPE_INFO, Types.SMALLINT);
+   m.put(INT_TYPE_INFO, Types.INTEGER);
+   m.put(LONG_TYPE_INFO, Types.BIGINT);
+   m.put(FLOAT_TYPE_INFO, Types.FLOAT);
+   m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
+   m.put(CHAR_TYPE_INFO, Types.SMALLINT);
--- End diff --

`JDBCOutputFormat` will insert a `SMALLINT` by casting to `short`. This 
cast will fail for `Character`. 

Please double check the type assignment and align it with 
`JDBCOutputFormat.writeRecord()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-08-08 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r132022891
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   JDBCAppendTableSink copy = new 
JDBCAppendTableSink(sink.outputFormat);
--- End diff --

The `JDBCOutputFormat` now is only constructed via 
`JDBCAppendableSinkBuilder`, thus the types should always match, but it is a 
good idea to add the checks to catch potential bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128221151
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchInterval = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
+   this.batchInterval = batchInterval;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation[] 
fieldTypes) {
--- End diff --

Make `fieldTypes` a vararg for convenience? 

I would think that `java.sql.Types` would be more natural in the context of 
a JDBC sink but I'm open for `TypeInformation` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128220251
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -218,10 +225,7 @@ public void writeRecord(Row row) throws IOException {
@Override
public void close() throws IOException {
try {
-   if (upload != null) {
-   upload.executeBatch();
-   upload.close();
--- End diff --

we should `close()` the `PreparedStatement`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128218692
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
--- End diff --

Add a comment that `exactly-once` can be achieved by idempotent insert 
operations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128219589
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+   private String username;
+   private String password;
+   private String driverName;
+   private String dbURL;
+   private String query;
+   private int batchInterval = DEFAULT_BATCH_INTERVAL;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCAppendTableSinkBuilder setUsername(String username) {
+   this.username = username;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setPassword(String password) {
+   this.password = password;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+   this.driverName = drivername;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+   this.dbURL = dbURL;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setQuery(String query) {
+   this.query = query;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
--- End diff --

IMO, interval has a temporal connotation. `batchInterval` -> `batchSize`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128219296
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink, 
BatchTableSink {
+   private final JDBCSinkFunction sink;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+   this.sink = new JDBCSinkFunction(outputFormat);
+   }
+
+   public static JDBCAppendTableSinkBuilder builder() {
+   return new JDBCAppendTableSinkBuilder();
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(sink);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   dataSet.output(sink.outputFormat);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   JDBCAppendTableSink copy = new 
JDBCAppendTableSink(sink.outputFormat);
--- End diff --

We could validate that the types of the `JDBCOutputFormat` match the 
`fieldTypes` which are provided by the optimizer. 
Or do you have concerns regarding such a check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r128226313
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+   private static final RowTypeInfo ROW_TYPE = new 
RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
+
+   @Test
+   public void testAppendTableSink() throws IOException {
+   JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+   .setDrivername("foo")
+   .setDBUrl("bar")
+   .setQuery("insert into %s (id) values (?)")
+   .setFieldTypes(FIELD_TYPES)
+   .build();
+
+   StreamExecutionEnvironment env =
+   mock(StreamExecutionEnvironment.class);
+   doAnswer(new Answer() {
+   @Override
+   public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+   return invocationOnMock.getArguments()[0];
+   }
+   }).when(env).clean(any());
+
+   TypeSerializer ts = 
ROW_TYPE.createSerializer(mock(ExecutionConfig.class));
+   FromElementsFunction func = new FromElementsFunction<>(ts, 
Row.of("foo"));
+   DataStream ds = new DataStreamSource<>(env, ROW_TYPE, new 
StreamSource<>(func), false, "foo");
+   DataStreamSink dsSink = ds.addSink(sink.getSink());
--- End diff --

I think we should test for the correctness of the `emitDataStream()` method.
Could be done as follows:

```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream ds = 
env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
sink.emitDataStream(ds);

Collection sinkIds = env.getStreamGraph().getSinkIDs();
assertEquals(1, sinkIds.size());
int sinkId = sinkIds.iterator().next();

StreamSink planSink = 
(StreamSink)env.getStreamGraph().getStreamNode(sinkId).getOperator();
assertSame(sink.getSink(), planSink.getUserFunction());
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r126718250
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
--- End diff --

I would not extend `RichSinkFunction`. Although this might work in 
practice, I think this mixes the logical representation of a table (in the 
catalog and during optimization) with the actual runtime code. I'd rather 
implement a separate JdbcSinkFunction (within this file) and instantiate it in 
`emitDataStream()`.

I also think that we should implement the `BatchTableSink` interface which 
would directly use the `JdbcOutputFormat`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r126724241
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
+   implements AppendStreamTableSink, CheckpointedFunction {
+   private final JDBCOutputFormat outputFormat;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(JDBCOutputFormat outputFormat) {
+   this.outputFormat = outputFormat;
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.addSink(this);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   JDBCTableSink copy = new JDBCTableSink(outputFormat);
+   copy.fieldNames = fieldNames;
+   copy.fieldTypes = fieldTypes;
--- End diff --

We could validate that the types provided by the Table API are compatible 
with the types that the `JdbcOutputFormat` expects to avoid exceptions during 
execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-07-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r126736740
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for JDBCTableSink.
+ */
+public class JDBCTableSinkTest extends JDBCTestBase {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+
+
+   private JDBCOutputFormat jdbcOutputFormat;
+
+   @After
+   public void tearDown() throws IOException {
+   if (jdbcOutputFormat != null) {
+   jdbcOutputFormat.close();
+   }
+   jdbcOutputFormat = null;
+   }
+
+   @Test
+   public void testFlush() throws Exception {
--- End diff --

This tests rather the `JdbcOutputFormat.flush()` method. I would move (and 
adapt) this test to the `JdbcOutputFormatTest`.

For the `JdbcTableSink` we need tests that check the configuration (which 
is done below) and a check that the `emitDataStream()` method (and the returned 
`SinkFunction`) is working correctly. I would do this by calling 
`emitDataStream()` with a mocked `DataStream` and fetching the 
`SinkFunction` from the returned `DataStreamSink` 
(`sink.getTransformation().getOperator().getUserFunction()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r122971462
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
--- End diff --

Rename to `JDBCAppendTableSink`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r122972333
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
--- End diff --

I think it could also be easily extended to support batch output by 
implementing the `BatchTableSink` interface and implementing 
`emitDataSet(dataSet: DataSet[Row])` as
```
def emitDataSet(dataSet: DataSet[Row]): Unit = {
  dataSet.output(jdbcOutputFormat)
}
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r122970495
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ */
+public class JDBCTableSink extends RichSinkFunction
+   implements AppendStreamTableSink, CheckpointedFunction {
+   private final JDBCOutputFormat outputFormat;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(JDBCOutputFormat outputFormat) {
--- End diff --

What do you think about not exposing the `JDBCOutputFormat` to the user, 
but to configure it internally.

Of course we would need many of the configuration parameters (user, pw, 
driver, dburl, and table name). Users could either specify field names of the 
table to write to (fields are mapped by position) or not (we use the field 
names of the `Table` to emit). For this information we can construct a 
parameterized insert query: `INSERT INTO $table ($f1, $f2, $f3) VALUES (?, ?, 
?)`.  The field types are automatically provided by the `configure()` call.

This would be a tighter integration with the Table API (using provided 
field types and possibly field names).
Does this work for your use case or do you need the flexibility of 
specifying your own query?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-07 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120765156
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
upload.addBatch();
batchCount++;
if (batchCount >= batchInterval) {
-   upload.executeBatch();
-   batchCount = 0;
+   flush();
}
} catch (SQLException | IllegalArgumentException e) {
throw new IllegalArgumentException("writeRecord() 
failed", e);
}
}
 
+   void flush() throws SQLException {
+   if (upload != null) {
+   upload.executeBatch();
--- End diff --

It is a synchronous call. It will throw `SQLException` and abort the sink. 
The behavior has not been changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120576118
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
upload.addBatch();
batchCount++;
if (batchCount >= batchInterval) {
-   upload.executeBatch();
-   batchCount = 0;
+   flush();
}
} catch (SQLException | IllegalArgumentException e) {
throw new IllegalArgumentException("writeRecord() 
failed", e);
}
}
 
+   void flush() throws SQLException {
+   if (upload != null) {
+   upload.executeBatch();
--- End diff --

It's been a while since i worked with JDBC, I take it this is a synchronous 
call? What happens if this call fails?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120576032
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for JDBCTableSink.
+ */
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+
+   @Test
+   public void testOutputSink() throws Exception {
+   JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
+   JDBCTableSink sink = new JDBCTableSink(outputFormat);
+   @SuppressWarnings("unchecked")
+   DataStream dataStream = (DataStream) 
mock(DataStream.class);
+   sink.emitDataStream(dataStream);
+   verify(dataStream).addSink(sink);
--- End diff --

you don't have to test this, as it is not a detail of the JDBCTableSink but 
the table API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-06-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r120575757
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for JDBCTableSink.
+ */
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[]{"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+   BasicTypeInfo.STRING_TYPE_INFO
+   };
+
+   @Test
+   public void testOutputSink() throws Exception {
+   JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
+   JDBCTableSink sink = new JDBCTableSink(outputFormat);
+   @SuppressWarnings("unchecked")
+   DataStream dataStream = (DataStream) 
mock(DataStream.class);
+   sink.emitDataStream(dataStream);
+   verify(dataStream).addSink(sink);
+   }
+
+   @Test
+   public void testFlush() throws Exception {
+   JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
+   JDBCTableSink sink = new JDBCTableSink(outputFormat);
+   @SuppressWarnings("unchecked")
+   DataStream dataStream = (DataStream) 
mock(DataStream.class);
+   sink.emitDataStream(dataStream);
+   sink.snapshotState(mock(FunctionSnapshotContext.class));
+   verify(dataStream).addSink(sink);
+   verify(outputFormat).flush();
--- End diff --

let's not use mocking for this test. Just create an actual format/sink, 
give N values to the sink where N < batchSize, verify they haven't been written 
yet, call flush, verify they were written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113411053
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {
--- End diff --

remove space after `[]`. move `STRING_TYPE_INFO` to this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412006
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
+   super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
+   this.outputFormat = outputFormat;
+   this.committer = committer;
+   this.fieldNames = fieldNames;
+   this.fieldTypes = fieldTypes;
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.transform("JDBC Sink", getOutputType(), this);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   try {
+   return new JDBCTableSink(committer, serializer, 
outputFormat, fieldNames, fieldTypes);
+   } catch (Exception e) {
+   LOG.warn("Failed to create a copy of the sink.", e);
+   return null;
+   }
+   }
+
+   @Override
+   protected boolean sendValues(Iterable value, long timestamp) 
throws Exception {
+   for (Row r : value) {
+   try {
+   outputFormat.writeRecord(r);
--- End diff --

This doesn't guarantee in any way  that the values are actually being sent; 
you need some kind of flushing functionality for this to work properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412806
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
--- End diff --

I would propose either adding a JDBCCheckpointCommitter that cooperates 
with the sink (as seen in this 
[prototype](https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900)
 or omitting the `CheckpointCommitter` argument and providing a dummy to the 
`GenericWriteAheadSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410993
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
--- End diff --

remove space after `[]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410894
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
--- End diff --

like the cassandra sink the `fieldNames/Types` should be removed to provide 
a clean API to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-12 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3712

[FLINK-6281] Create TableSink for JDBC.

This PR implements the `StreamTableSink` interface for the JDBC connectors 
so that the streaming SQL APIs can directly interact with them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-6281

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3712.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3712


commit 2bfd014bedc5bd64f346652dfb5ddb41cc36cc3f
Author: Haohui Mai 
Date:   2017-04-12T06:56:56Z

[FLINK-6281] Create TableSink for JDBC.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---