This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this 
push:
     new c80d024  Added support for upsert operation.
c80d024 is described below

commit c80d024816046e3929bc316d6e007f607db36f07
Author: Anil <aging...@pivotal.io>
AuthorDate: Thu Oct 26 16:24:55 2017 -0700

    Added support for upsert operation.
---
 .../apache/geode/connectors/jdbc/JDBCManager.java  | 26 +++++++++++-
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       | 49 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)

diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
index 86980e0..1b98da6 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -69,6 +69,26 @@ public class JDBCManager {
   public void write(Region region, Operation operation, Object key, 
PdxInstance value) {
     String tableName = getTableName(region);
     List<ColumnValue> columnList = getColumnToValueList(tableName, key, value, 
operation);
+    int updateCount = executeWrite(columnList, tableName, operation, false);
+    if (operation.isDestroy()) {
+      return;
+    }
+    if (updateCount <= 0) {
+      Operation upsertOp;
+      if (operation.isUpdate()) {
+        upsertOp = Operation.CREATE;
+      } else {
+        upsertOp = Operation.UPDATE;
+      }
+      updateCount = executeWrite(columnList, tableName, upsertOp, true);
+    }
+    if (updateCount != 1) {
+      throw new IllegalStateException("Unexpected updateCount " + updateCount);
+    }
+  }
+
+  private int executeWrite(List<ColumnValue> columnList, String tableName, 
Operation operation,
+      boolean handleException) {
     PreparedStatement pstmt = getQueryStatement(columnList, tableName, 
operation);
     try {
       int idx = 0;
@@ -77,8 +97,12 @@ public class JDBCManager {
         pstmt.setObject(idx, cv.getValue());
       }
       pstmt.execute();
+      return pstmt.getUpdateCount();
     } catch (SQLException e) {
-      handleSQLException(e);
+      if (handleException || operation.isDestroy()) {
+        handleSQLException(e);
+      }
+      return 0;
     } finally {
       clearStatement(pstmt);
     }
diff --git 
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
 
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 42999a3..87b654b 100644
--- 
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++ 
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -231,6 +231,55 @@ public class JDBCAsyncWriterIntegrationTest {
     assertThat(rs.next()).isFalse();
   }
 
+  @Test
+  public void canUpdateBecomeInsert() throws Exception {
+    Region employees = createRegionWithJDBCAsyncWriter(regionTableName, 
getRequiredProperties());
+    PdxInstance pdx1 = 
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    employees.put("1", pdx1);
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> 
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    stmt.execute("delete from " + regionTableName + " where id = '1'");
+    validateTableRowCount(0);
+
+    PdxInstance pdx3 = 
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 72).create();
+    employees.put("1", pdx3);
+
+    Awaitility.await().atMost(10, TimeUnit.SECONDS)
+        .until(() -> 
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet rs = stmt.executeQuery("select * from " + regionTableName + " 
order by id asc");
+    assertThat(rs.next()).isTrue();
+    assertThat(rs.getString("id")).isEqualTo("1");
+    assertThat(rs.getString("name")).isEqualTo("Emp1");
+    assertThat(rs.getObject("age")).isEqualTo(72);
+    assertThat(rs.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertBecomeUpdate() throws Exception {
+    stmt.execute("Insert into " + regionTableName + " values('1', 'bogus', 
11)");
+    validateTableRowCount(1);
+
+    Region employees = createRegionWithJDBCAsyncWriter(regionTableName, 
getRequiredProperties());
+    PdxInstance pdx1 = 
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    employees.put("1", pdx1);
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> 
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet rs = stmt.executeQuery("select * from " + regionTableName + " 
order by id asc");
+    assertThat(rs.next()).isTrue();
+    assertThat(rs.getString("id")).isEqualTo("1");
+    assertThat(rs.getString("name")).isEqualTo("Emp1");
+    assertThat(rs.getObject("age")).isEqualTo(55);
+    assertThat(rs.next()).isFalse();
+  }
+
   private Region createRegionWithJDBCAsyncWriter(String regionName, Properties 
props) {
     jdbcWriter = new JDBCAsyncWriter();
     jdbcWriter.init(props);

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Reply via email to