[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/282


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-13 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70569998
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * In the next iterations this operator would support an in-clause for
+ * idempotency instead of having only range query support to support non 
ordered
+ * key columns
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+@Evolving
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
+implements ActivationListener, IdleTimeHandler, 
Partitioner
+{
+  /**
+   * poll interval in milliseconds
+   */
+  private static int pollInterval = 1;
+
+  @Min(1)
+  private int partitionCount = 1;
+  protected transient int operatorId;
+  protected transient boolean isReplayed;
+  protected transient boolean isPollable;
+  protected int batchSize;
+  protected static int fetchSize = 2;
+  /**
+   * Map of windowId to  of the range key
+   */
+  protected transient MutablePair 
currentWindowRecoveryState;
+
+  /**
+   * size of the emit queue used to hold polled records before emit
+   */
+  private static int queueCapacity = 4 * 1024 * 1024;
+  private transient volatile boolean execute;
+  private transient AtomicReference cause;
+  protected transient int spinMillis;
+  private transient OperatorContext context;
+  protected String tableName;
+  protected String key;
+  protected long currentWindowId;
+  protected KeyValPair rangeQueryPair;
+  protected String lower;
+  

[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70488149
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70483447
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
+}
+
+try {
+  dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, 
DB_PASSWORD);
+  return dbConnection;
+} catch (SQLException e) {
+  LOG.error("Exception in getting connection handle", e);
+}
+
+return dbConnection;
+
+  }
+
+  private static String generateQueryString()
+  {
+StringBuilder sb = new StringBuilder();
+sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+if (WHERE_CLAUSE != null) {
+  sb.append(" WHERE " + WHERE_CLAUSE);
+}
+
+return sb.toString();
+  }
+
+  /**
+   * Finds the total number of rows in the table
+   */
+  private static long getRecordRange(String query) throws SQLException
+  {
+long rowCount = 0;
+Connection dbConnection = null;
+PreparedStatement preparedStatement = null;
+
+try {
+  dbConnection = getDBConnection();
+  preparedStatement = dbConnection.prepareStatement(query);
+
+  ResultSet rs = preparedStatement.executeQuery();
+
+  while (rs.next()) {
+rowCount = Long.parseLong(rs.getString("RowCount"));
+LOG.info("# Rows - " + rowCount);
+  }
+
+} catch (SQLException e) {
+  LOG.error("Exception in retreiving result set", e);
+} finally {
+  if (preparedStatement != null) {
+preparedStatement.close();
+  }
+  if (dbConnection != null) {
+dbConnection.close();
+  }
+}
+return rowCount;
+  }
+
+  /**
+   * Returns a pair of  bounds for each partition of the
+   * {@link JdbcPollInputOperator}}
+   */
+  private static KeyValPair getQueryBounds(long lower, 
long upper) throws SQLException
+  {
+Connection dbConnectionLower = null;
+

[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70483210
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
--- End diff --

Done



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70483252
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
+}
+
+try {
+  dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, 
DB_PASSWORD);
+  return dbConnection;
+} catch (SQLException e) {
+  LOG.error("Exception in getting connection handle", e);
--- End diff --

Done



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70385310
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java ---
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link AbstractJdbcPollInputOperator} and
+ * {@link JdbcPollInputOperator}
+ */
+public class JdbcPollerTest
+{
+  public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  public static final String URL = 
"jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+  private static final String TABLE_NAME = "test_account_table";
+  private static String APP_ID = "JdbcPollingOperatorTest";
+  public String dir = null;
+
+  @BeforeClass
+  public static void setup()
+  {
+try {
+  cleanup();
+} catch (Exception e) {
+  throw new RuntimeException(e);
+}
+try {
+  Class.forName(DB_DRIVER).newInstance();
+
+  Connection con = DriverManager.getConnection(URL);
+  Statement stmt = con.createStatement();
+
+  String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+  + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)";
+  stmt.executeUpdate(createTable);
+
+} catch (Exception e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @AfterClass
+  public static void cleanup()
+  {
+try {
+  FileUtils.deleteDirectory(new File("target/" + APP_ID));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  public static void cleanTable()
+  {
+try {
+  Connection con = DriverManager.getConnection(URL);
+  Statement stmt = con.createStatement();
+  String cleanTable = "delete from " + TABLE_NAME;
+  stmt.executeUpdate(cleanTable);
+} catch (SQLException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  public static void insertEventsInTable(int numEvents, int offset)
+  {
+try {
+  Connection con = DriverManager.getConnection(URL);
+  String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+  PreparedStatement stmt = con.prepareStatement(insert);
+  for (int i = 0; i < numEvents; i++, offset++) {
+stmt.setInt(1, offset);
+stmt.setString(2, "Account_Holder-" + offset);
+stmt.setInt(3, (offset * 1000));
+stmt.executeUpdate();
+  }
+} catch (SQLException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  /**
+   * Simulates actual application flow Adds a batch query partitiom, a 
pollable
+   * partition Incremental record polling is also checked
+   */
+  @Test
+  public void testJdbcPollingInputOperatorBatch() throws 
InterruptedException
+  {
+cleanTable();
+insertEventsInTable(10, 0);
+JdbcStore store = new 

[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-06-10 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r66695465
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
+}
+
+try {
+  dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, 
DB_PASSWORD);
+  return dbConnection;
+} catch (SQLException e) {
+  LOG.error("Exception in getting connection handle", e);
+}
+
+return dbConnection;
+
+  }
+
+  private static String generateQueryString()
+  {
+StringBuilder sb = new StringBuilder();
+sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+if (WHERE_CLAUSE != null) {
+  sb.append(" WHERE " + WHERE_CLAUSE);
+}
+
+return sb.toString();
+  }
+
+  /**
+   * Finds the total number of rows in the table
+   */
+  private static long getRecordRange(String query) throws SQLException
+  {
+long rowCount = 0;
+Connection dbConnection = null;
+PreparedStatement preparedStatement = null;
+
+try {
+  dbConnection = getDBConnection();
+  preparedStatement = dbConnection.prepareStatement(query);
+
+  ResultSet rs = preparedStatement.executeQuery();
+
+  while (rs.next()) {
+rowCount = Long.parseLong(rs.getString("RowCount"));
+LOG.info("# Rows - " + rowCount);
--- End diff --

DOn't log this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-06-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r66364498
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
+}
+
+try {
+  dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, 
DB_PASSWORD);
+  return dbConnection;
+} catch (SQLException e) {
+  LOG.error("Exception in getting connection handle", e);
+}
+
+return dbConnection;
+
+  }
+
+  private static String generateQueryString()
+  {
+StringBuilder sb = new StringBuilder();
+sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+if (WHERE_CLAUSE != null) {
+  sb.append(" WHERE " + WHERE_CLAUSE);
+}
+
+return sb.toString();
+  }
+
+  /**
+   * Finds the total number of rows in the table
+   */
+  private static long getRecordRange(String query) throws SQLException
+  {
+long rowCount = 0;
+Connection dbConnection = null;
+PreparedStatement preparedStatement = null;
+
+try {
+  dbConnection = getDBConnection();
+  preparedStatement = dbConnection.prepareStatement(query);
+
+  ResultSet rs = preparedStatement.executeQuery();
+
+  while (rs.next()) {
+rowCount = Long.parseLong(rs.getString("RowCount"));
--- End diff --

Suggest to use index instead of a column alias. Also seems like this is a 
simple count utility. Can you rename it to something like "getTupleCount()" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r66169324
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
+implements ActivationListener, IdleTimeHandler, 
Partitioner
+{
+  /*
+   * poll interval in milliseconds
+   */
+  private int pollInterval;
+
+  @Min(1)
+  private int partitionCount = 1;
+  protected transient int operatorId;
+  protected transient boolean isReplayed;
+  protected transient boolean isPollable;
+  protected int batchSize;
+  protected int fetchSize;
+  /**
+   * Map of windowId to  of the range key
+   */
+  protected transient MutablePair 
currentWindowRecoveryState;
+
+  /**
+   * size of the emit queue used to hold polled records before emit
+   */
+  private int queueCapacity = 4 * 1024 * 1024;
+  private transient volatile boolean execute;
+  private transient AtomicReference cause;
+  protected transient int spinMillis;
+  private transient OperatorContext context;
+  protected String tableName;
+  protected String key;
+  protected long currentWindowId;
+  protected KeyValPair rangeQueryPair;
+  protected String lower;
+  protected String upper;
+  protected boolean recovered;
+  protected boolean isPolled;
+  protected String whereCondition = null;
--- End diff --

Can you also clarify which subset of SQL will be supported here? How 
complex can the where clause be?


---
If your project is set up for it, you