[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-28 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213408001
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138309
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
 
 Review comment:
   nit: log.info("Open jdbc connection : {}", jdbcUrl);


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139143
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139185
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139561
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
+}
+}
+
+/**
+ * Get the {@link Connection} for the given jdbcUrl.
+ */
+public static Connection getConnection(String jdbcUrl, Properties 
properties) throws Exception {
+String driver = jdbcUrl.split(":")[1];
+String driverClassName = getDriverClassName(driver);
+Class.forName(driverClassName);
+
+return DriverManager.getConnection(jdbcUrl, properties);
+}
+
+/**
+ * Get the {@link TableId} for the given tableName.
+ */
+public static TableId getTableId(Connection connection, String tableName) 
throws Exception {
+DatabaseMetaData metadata = connection.getMetaData();
+try (ResultSet rs = metadata.getTables(null, null, tableName, new 
String[]{"TABLE"})) {
+if (rs.next()) {
+String catalogName = rs.getString(1);
+String schemaName = rs.getString(2);
+String gotTableName = rs.getString(3);
+checkState(tableName.equals(gotTableName),
+"TableName not match: " + tableName + " Got: " + 
gotTableName);
+log.debug("Get Table: {}, {}, {}", catalogName, schemaName, 
tableName);
 
 Review comment:
   if (log.isDebugEnabled())


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213140305
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 ##
 @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder 
beforeSetupCluster(String clusterName, Pulsar
 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
 .withName(kafkaServiceName)
 .withHostName(clusterName + "-" + kafkaServiceName)));
+
 final String cassandraServiceName = "cassandra";
 externalServices.put(
 cassandraServiceName,
 new CassandraContainer(clusterName));
+
+// use mySQL for jdbc test
+final String jdbcServiceName = "jdbc";
+externalServices.put(
+jdbcServiceName,
+new MySQLContainer()
 
 Review comment:
   attach clusterName to the hostname
   
   ```
   new MySQLContainer()
.withNetworkAliases(jdbcServiceName)
.withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
   .withName(jdbcServiceName)
   .withHostName(clusterName + "-" + jdbcServiceName)));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138387
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
 
 Review comment:
   nit: log.info("Closed jdbc connection : {}", jdbcUrl);


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139604
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
+}
+}
+
+/**
+ * Get the {@link Connection} for the given jdbcUrl.
+ */
+public static Connection getConnection(String jdbcUrl, Properties 
properties) throws Exception {
+String driver = jdbcUrl.split(":")[1];
+String driverClassName = getDriverClassName(driver);
+Class.forName(driverClassName);
+
+return DriverManager.getConnection(jdbcUrl, properties);
+}
+
+/**
+ * Get the {@link TableId} for the given tableName.
+ */
+public static TableId getTableId(Connection connection, String tableName) 
throws Exception {
+DatabaseMetaData metadata = connection.getMetaData();
+try (ResultSet rs = metadata.getTables(null, null, tableName, new 
String[]{"TABLE"})) {
+if (rs.next()) {
+String catalogName = rs.getString(1);
+String schemaName = rs.getString(2);
+String gotTableName = rs.getString(3);
+checkState(tableName.equals(gotTableName),
+"TableName not match: " + tableName + " Got: " + 
gotTableName);
+log.debug("Get Table: {}, {}, {}", catalogName, schemaName, 
tableName);
+return TableId.of(catalogName, schemaName, tableName);
+} else {
+throw new Exception("Not able to find table: " + tableName);
+}
+}
+}
+
+/**
+ * Get the {@link TableDefinition} for the given table.
+ */
+public static TableDefinition getTableDefinition(Connection connection, 
TableId tableId) throws Exception {
+TableDefinition table = TableDefinition.of(tableId, 
Lists.newArrayList());
+
+try (ResultSet rs = connection.getMetaData().getColumns(
+tableId.getCatalogName(),
+tableId.getSchemaName(),
+ 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139256
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139527
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
 
 Review comment:
   probably log a warning message rather than failing it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138978
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213140171
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 ##
 @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder 
beforeSetupCluster(String clusterName, Pulsar
 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
 .withName(kafkaServiceName)
 .withHostName(clusterName + "-" + kafkaServiceName)));
+
 final String cassandraServiceName = "cassandra";
 externalServices.put(
 cassandraServiceName,
 new CassandraContainer(clusterName));
+
+// use mySQL for jdbc test
+final String jdbcServiceName = "jdbc";
 
 Review comment:
   jdbcServiceName = "mysql"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212717203
 
 

 ##
 File path: tests/integration/pom.xml
 ##
 @@ -78,6 +78,20 @@
   ${project.version}
   test
 
+
+
+  org.testcontainers
+  mysql
+  1.8.3
 
 Review comment:
   define `org.testcontainers:mysql` in root pom. so you don't need include 
version here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212716824
 
 

 ##
 File path: pulsar-io/jdbc/src/test/resources/META-INF/services/pulsar-io.yaml
 ##
 @@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: jdbc
+description: Jdbc sink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcStringSink
 
 Review comment:
   I don't think you need this under `src/test/resources`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212716512
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
 ##
 @@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class JdbcSinkConfig implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private String userName;
+private String password;
+private String jdbcUrl;
+private String tableName;
+
+// schema for input topic
+private String schema;
 
 Review comment:
   add `TODO` that this will be removed later


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212717520
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 ##
 @@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+
+/**
+ * A tester for testing jdbc sink.
+ * This will use MySql as DB server
+ */
+@Slf4j
+public class JdbcSinkTester extends SinkTester {
+
+/**
+ * A Simple class to test jdbc class,
+ *
+ */
+@Data
+@ToString
+@EqualsAndHashCode
+public static class Foo {
+private final String field1;
+private final String field2;
+private final int field3;
+}
+
+private static final String NAME = "jdbc";
+
+private MySQLContainer mySQLContainer;
+private AvroSchema schema = AvroSchema.of(Foo.class);
+private String tableName = "test";
+private Connection connection;
+
+public JdbcSinkTester() {
+super(NAME);
+
+// container default value is test
+sinkConfig.put("userName", "test");
+sinkConfig.put("password", "test");
+sinkConfig.put("tableName", tableName);
+
+// prepare schema
+sinkConfig.put("schema",  new 
String(schema.getSchemaInfo().getSchema()));
+log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+sinkConfig.put("batchSize", 1);
+}
+
+@Override
+public void findSinkServiceContainer(Map> 
containers) {
+GenericContainer container = containers.get(NAME);
+checkState(container instanceof MySQLContainer,
+"No kafka service found in the cluster");
 
 Review comment:
   No Mysql Service found


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212714903
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+private String schema;
+
+private JdbcUtils.TableId tableId;
+private JdbcUtils.TableDefinition tableDefinition;
+private PreparedStatement insertStatement;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+// TODO: currently assume schema is same as created table schema;
+// turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)?
+
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection open");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception{
+synchronized (incomingList) {
+incomingList.add(record);
+}
+
+if (incomingList.size() >= batchSize) {
+flushExecutor.schedule(() -> 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212717769
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 ##
 @@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+
+/**
+ * A tester for testing jdbc sink.
+ * This will use MySql as DB server
+ */
+@Slf4j
+public class JdbcSinkTester extends SinkTester {
+
+/**
+ * A Simple class to test jdbc class,
+ *
+ */
+@Data
+@ToString
+@EqualsAndHashCode
+public static class Foo {
+private final String field1;
+private final String field2;
+private final int field3;
+}
+
+private static final String NAME = "jdbc";
+
+private MySQLContainer mySQLContainer;
+private AvroSchema schema = AvroSchema.of(Foo.class);
+private String tableName = "test";
+private Connection connection;
+
+public JdbcSinkTester() {
+super(NAME);
+
+// container default value is test
+sinkConfig.put("userName", "test");
+sinkConfig.put("password", "test");
+sinkConfig.put("tableName", tableName);
+
+// prepare schema
+sinkConfig.put("schema",  new 
String(schema.getSchemaInfo().getSchema()));
+log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+sinkConfig.put("batchSize", 1);
+}
+
+@Override
+public void findSinkServiceContainer(Map> 
containers) {
+GenericContainer container = containers.get(NAME);
+checkState(container instanceof MySQLContainer,
+"No kafka service found in the cluster");
+
+this.mySQLContainer = (MySQLContainer) container;
+log.info("find sink service container: {}", 
mySQLContainer.getContainerName());
+}
+
+@Override
+public void prepareSink() throws Exception {
+String jdbcUrl = mySQLContainer.getJdbcUrl();
+sinkConfig.put("jdbcUrl", jdbcUrl);
+String driver = mySQLContainer.getDriverClassName();
+Class.forName(driver);
+
+connection = DriverManager.getConnection(jdbcUrl, "test", "test");
+
+// create table
+String createTable = "CREATE TABLE " + tableName +
+" (field1 TEXT, field2 TEXT, field3 INTEGER, PRIMARY KEY 
(field3))";
+connection.createStatement().executeUpdate(createTable);
+log.info("created table in jdbc: {}", createTable);
+}
+
+@Override
+public void validateSinkResult(Map kvs) {
+log.info("Query table content from mysql server: {}", tableName);
+String querySql = "SELECT * FROM " + tableName;
+ResultSet rs;
+try {
+PreparedStatement statement = 
connection.prepareStatement(querySql);
+rs = statement.executeQuery();
+
+while (rs.next()) {
+String field1 = rs.getString(1);
+String field2 = rs.getString(2);
+int field3 = rs.getInt(3);
+log.info("Row : {}, {}, {}", field3, field1, field2);
+
+String value = kvs.get("key-" + field3);
+log.info("Value in Kvs: {}", value);
+Foo obj = schema.decode(value.getBytes());
+assertEquals(obj.field1, field1);
+assertEquals(obj.field2, field2);
+assertEquals(obj.field3, field3);
+}
+} catch (Exception e) {
+e.printStackTrace();
 
 Review 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212715969
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+private String schema;
+
+private JdbcUtils.TableId tableId;
+private JdbcUtils.TableDefinition tableDefinition;
+private PreparedStatement insertStatement;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+// TODO: currently assume schema is same as created table schema;
+// turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)?
+
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection open");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception{
+synchronized (incomingList) {
+incomingList.add(record);
+}
+
+if (incomingList.size() >= batchSize) {
+flushExecutor.schedule(() -> 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212716381
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
 ##
 @@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.sql.PreparedStatement;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+
+/**
+ * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ */
+@Slf4j
+public class JdbcAvroSchemaSink extends JdbcAbstractSink {
+
+private Schema avroSchema = null;
+private DatumReader reader = null;
+
+public void bindValue(PreparedStatement statement,
+  TableDefinition tableDefinition,
+  String schema,
+  Record message) throws Exception {
+
+log.info("schema: {}", schema);
 
 Review comment:
   line 48-56 should be done during `#open`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212716931
 
 

 ##
 File path: pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
 ##
 @@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: jdbc
+description: Jdbc sink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcStringSink
 
 Review comment:
   this should be org.apache.pulsar.io.jdbc.JdbcAvroSink


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-24 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r212714732
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+private String schema;
+
+private JdbcUtils.TableId tableId;
+private JdbcUtils.TableDefinition tableDefinition;
+private PreparedStatement insertStatement;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+// TODO: currently assume schema is same as created table schema;
+// turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)?
+
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection open");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception{
+synchronized (incomingList) {
+incomingList.add(record);
+}
+
+if (incomingList.size() >= batchSize) {
 
 Review comment:
   incomingList.size() should