[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378897#comment-15378897 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/282 > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads. > This operator will create a configured number of non-polling static > partitions for fetching the existing data in the table. And an additional > single partition for polling additive data. > 4.Assumption is that there is an ordered column using which range queries can > be formed. > The *key* column, based on which the polling will happen, is any column which > has ever increasing values and supports greater than and less > than operations in SQL. > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > 7. Only newly added data which has increasing ids will be fetched by the >polling jdbc partition > Per window the first and the last key processed is saved using the > FSWindowDataManager - (,operatorId,windowId).This > (lowerBound,upperBoundPair) is then used for recovery.The queries are > constructed using the JDBCMetaDataUtility. > JDBCMetaDataUtility > A utility class used to retrieve the metadata for a given unique key of a SQL > table. This class would emit range queries based on a primary index given. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373332#comment-15373332 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user devtagare commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70488149 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator--- End diff -- Done > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373298#comment-15373298 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user devtagare commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70483252 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * A utility class used to retrieve the metadata for a given unique key of a SQL + * table. This class would emit range queries based on a primary index given + * + * @Input - dbName,tableName, primaryKey + * @Output - map+ * + */ +public class JdbcMetaDataUtility +{ + private static String DB_DRIVER = "com.mysql.jdbc.Driver"; + private static String DB_CONNECTION = ""; + private static String DB_USER = ""; + private static String DB_PASSWORD = ""; + private static String TABLE_NAME = ""; + private static String KEY_COLUMN = ""; + private static String WHERE_CLAUSE = null; + private static String COLUMN_LIST = null; + + private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); + + public JdbcMetaDataUtility() + { + + } + + public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) + { +DB_CONNECTION = dbConnection; +DB_USER = userName; +DB_PASSWORD = password; +TABLE_NAME = tableName; +KEY_COLUMN = key; + } + + private static Connection getDBConnection() + { + +Connection dbConnection = null; + +try { + Class.forName(DB_DRIVER); +} catch (ClassNotFoundException e) { + LOG.error("Driver not found", e); +} + +try { + dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + return dbConnection; +} catch (SQLException e) { + LOG.error("Exception in getting connection handle", e); --- End diff -- Done > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > Per window the first and the last key processed is saved using the > FSWindowDataManager -
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373297#comment-15373297 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user devtagare commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70483210 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * A utility class used to retrieve the metadata for a given unique key of a SQL + * table. This class would emit range queries based on a primary index given + * + * @Input - dbName,tableName, primaryKey + * @Output - map+ * + */ +public class JdbcMetaDataUtility +{ + private static String DB_DRIVER = "com.mysql.jdbc.Driver"; + private static String DB_CONNECTION = ""; + private static String DB_USER = ""; + private static String DB_PASSWORD = ""; + private static String TABLE_NAME = ""; + private static String KEY_COLUMN = ""; + private static String WHERE_CLAUSE = null; + private static String COLUMN_LIST = null; + + private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); + + public JdbcMetaDataUtility() + { + + } + + public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) + { +DB_CONNECTION = dbConnection; +DB_USER = userName; +DB_PASSWORD = password; +TABLE_NAME = tableName; +KEY_COLUMN = key; + } + + private static Connection getDBConnection() + { + +Connection dbConnection = null; + +try { + Class.forName(DB_DRIVER); +} catch (ClassNotFoundException e) { + LOG.error("Driver not found", e); --- End diff -- Done > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > Per window the first and the last key processed is saved using the > FSWindowDataManager - ( ,operatorId,windowId).This > (lowerBound,upperBoundPair) is then used for recovery.The queries are > constructed using the JDBCMetaDataUtility. > JDBCMetaDataUtility > A utility class used to retrieve the metadata for a given unique key of a SQL > table. This class would emit range
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372281#comment-15372281 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70385310 --- Diff: library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java --- @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +/** + * Tests for {@link AbstractJdbcPollInputOperator} and + * {@link JdbcPollInputOperator} + */ +public class JdbcPollerTest +{ + public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + + private static final String TABLE_NAME = "test_account_table"; + private static String APP_ID = "JdbcPollingOperatorTest"; + public String dir = null; + + @BeforeClass + public static void setup() + { +try { + cleanup(); +} catch (Exception e) { + throw new RuntimeException(e); +} +try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)"; + stmt.executeUpdate(createTable); + +} catch (Exception e) { + throw new RuntimeException(e); +} + } + + @AfterClass + public static void cleanup() + { +try { + FileUtils.deleteDirectory(new File("target/" + APP_ID)); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + public static void cleanTable() + { +try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); +} catch (SQLException e) { + throw new RuntimeException(e); +} + } + + public static void insertEventsInTable(int numEvents, int offset) + { +try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { +stmt.setInt(1, offset); +stmt.setString(2, "Account_Holder-" + offset); +stmt.setInt(3, (offset * 1000)); +stmt.executeUpdate(); + } +} catch (SQLException e) { + throw new RuntimeException(e); +} + } + + /** + * Simulates actual application flow Adds a batch query partitiom, a pollable + * partition
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325524#comment-15325524 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66695465 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * A utility class used to retrieve the metadata for a given unique key of a SQL + * table. This class would emit range queries based on a primary index given + * + * @Input - dbName,tableName, primaryKey + * @Output - map+ * + */ +public class JdbcMetaDataUtility +{ + private static String DB_DRIVER = "com.mysql.jdbc.Driver"; + private static String DB_CONNECTION = ""; + private static String DB_USER = ""; + private static String DB_PASSWORD = ""; + private static String TABLE_NAME = ""; + private static String KEY_COLUMN = ""; + private static String WHERE_CLAUSE = null; + private static String COLUMN_LIST = null; + + private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); + + public JdbcMetaDataUtility() + { + + } + + public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) + { +DB_CONNECTION = dbConnection; +DB_USER = userName; +DB_PASSWORD = password; +TABLE_NAME = tableName; +KEY_COLUMN = key; + } + + private static Connection getDBConnection() + { + +Connection dbConnection = null; + +try { + Class.forName(DB_DRIVER); +} catch (ClassNotFoundException e) { + LOG.error("Driver not found", e); +} + +try { + dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + return dbConnection; +} catch (SQLException e) { + LOG.error("Exception in getting connection handle", e); +} + +return dbConnection; + + } + + private static String generateQueryString() + { +StringBuilder sb = new StringBuilder(); +sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME); + +if (WHERE_CLAUSE != null) { + sb.append(" WHERE " + WHERE_CLAUSE); +} + +return sb.toString(); + } + + /** + * Finds the total number of rows in the table + */ + private static long getRecordRange(String query) throws SQLException + { +long rowCount = 0; +Connection dbConnection = null; +PreparedStatement preparedStatement = null; + +try { + dbConnection = getDBConnection(); + preparedStatement = dbConnection.prepareStatement(query); + + ResultSet rs = preparedStatement.executeQuery(); + + while (rs.next()) { +rowCount = Long.parseLong(rs.getString("RowCount")); +LOG.info("# Rows - " + rowCount); --- End diff -- DOn't log this. > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321152#comment-15321152 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66312905 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; --- End diff -- @NotNull? > Add jdbc poller input operator >
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321135#comment-15321135 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66311108 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered;
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319666#comment-15319666 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66169324 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered;
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309181#comment-15309181 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user devtagare commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r65297281 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator--- End diff -- @gauravgopi123 agreed.Apologies for the same.I will add the assumptions and the use-case. > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308403#comment-15308403 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user gauravgopi123 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r65245106 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator--- End diff -- why write another and not enhance existing? Can you please point me to the discussion that happened over @dev? > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308139#comment-15308139 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user devtagare commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r65223757 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator--- End diff -- This operator supports polling,partitioning & idempotency. > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305731#comment-15305731 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user tweise commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r64997100 --- Diff: library/src/test/resources/log4j.properties --- @@ -39,5 +39,5 @@ log4j.appender.SYSLOG.Facility=LOCAL1 log4j.logger.org=info #log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug +log4j.logger.com.datatorrent=info --- End diff -- Why this change? > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305729#comment-15305729 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user tweise commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r64997090 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered;
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303005#comment-15303005 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user gauravgopi123 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r64826973 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator--- End diff -- How is AbstractJdbcPollInputOperator different from AbstractJdbcInputOperator? > Add jdbc poller input operator > -- > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303002#comment-15303002 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user devtagare commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r64826542 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; + protected String lower; + protected String upper; + protected boolean