This is an automated email from the ASF dual-hosted git repository. agingade pushed a commit to branch feature/GEODE-3781 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push: new c80d024 Added support for upsert operation. c80d024 is described below commit c80d024816046e3929bc316d6e007f607db36f07 Author: Anil <aging...@pivotal.io> AuthorDate: Thu Oct 26 16:24:55 2017 -0700 Added support for upsert operation. --- .../apache/geode/connectors/jdbc/JDBCManager.java | 26 +++++++++++- .../jdbc/JDBCAsyncWriterIntegrationTest.java | 49 ++++++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java index 86980e0..1b98da6 100644 --- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java +++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java @@ -69,6 +69,26 @@ public class JDBCManager { public void write(Region region, Operation operation, Object key, PdxInstance value) { String tableName = getTableName(region); List<ColumnValue> columnList = getColumnToValueList(tableName, key, value, operation); + int updateCount = executeWrite(columnList, tableName, operation, false); + if (operation.isDestroy()) { + return; + } + if (updateCount <= 0) { + Operation upsertOp; + if (operation.isUpdate()) { + upsertOp = Operation.CREATE; + } else { + upsertOp = Operation.UPDATE; + } + updateCount = executeWrite(columnList, tableName, upsertOp, true); + } + if (updateCount != 1) { + throw new IllegalStateException("Unexpected updateCount " + updateCount); + } + } + + private int executeWrite(List<ColumnValue> columnList, String tableName, Operation operation, + boolean handleException) { PreparedStatement pstmt = getQueryStatement(columnList, tableName, operation); try { int idx = 0; @@ -77,8 +97,12 @@ public class JDBCManager { pstmt.setObject(idx, cv.getValue()); } pstmt.execute(); + return pstmt.getUpdateCount(); } catch (SQLException e) { - handleSQLException(e); + if (handleException || operation.isDestroy()) { + handleSQLException(e); + } + return 0; } finally { clearStatement(pstmt); } diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java index 42999a3..87b654b 100644 --- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java +++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java @@ -231,6 +231,55 @@ public class JDBCAsyncWriterIntegrationTest { assertThat(rs.next()).isFalse(); } + @Test + public void canUpdateBecomeInsert() throws Exception { + Region employees = createRegionWithJDBCAsyncWriter(regionTableName, getRequiredProperties()); + PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1") + .writeInt("age", 55).create(); + employees.put("1", pdx1); + + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1)); + + stmt.execute("delete from " + regionTableName + " where id = '1'"); + validateTableRowCount(0); + + PdxInstance pdx3 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1") + .writeInt("age", 72).create(); + employees.put("1", pdx3); + + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2)); + + ResultSet rs = stmt.executeQuery("select * from " + regionTableName + " order by id asc"); + assertThat(rs.next()).isTrue(); + assertThat(rs.getString("id")).isEqualTo("1"); + assertThat(rs.getString("name")).isEqualTo("Emp1"); + assertThat(rs.getObject("age")).isEqualTo(72); + assertThat(rs.next()).isFalse(); + } + + @Test + public void canInsertBecomeUpdate() throws Exception { + stmt.execute("Insert into " + regionTableName + " values('1', 'bogus', 11)"); + validateTableRowCount(1); + + Region employees = createRegionWithJDBCAsyncWriter(regionTableName, getRequiredProperties()); + PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1") + .writeInt("age", 55).create(); + employees.put("1", pdx1); + + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1)); + + ResultSet rs = stmt.executeQuery("select * from " + regionTableName + " order by id asc"); + assertThat(rs.next()).isTrue(); + assertThat(rs.getString("id")).isEqualTo("1"); + assertThat(rs.getString("name")).isEqualTo("Emp1"); + assertThat(rs.getObject("age")).isEqualTo(55); + assertThat(rs.next()).isFalse(); + } + private Region createRegionWithJDBCAsyncWriter(String regionName, Properties props) { jdbcWriter = new JDBCAsyncWriter(); jdbcWriter.init(props); -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" <commits@geode.apache.org>'].