timoninmaxim commented on code in PR #317: URL: https://github.com/apache/ignite-extensions/pull/317#discussion_r2254931594
########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { + /** */ + private static final int NO_SQL_TYPE = -1; + + /** */ + private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new HashMap<>(); + + static { + for (JavaToSqlType type : JavaToSqlType.values()) + JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + * @param stmt {@link PreparedStatement} + * @param idx value index in {@link PreparedStatement} + * @param obj value + */ + public void setEventFieldValue(PreparedStatement stmt, Integer idx, Object obj) { Review Comment: Which `Event` do you mean? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -46,58 +43,6 @@ /** */ public class IgniteToPostgreSqlCdcApplier { Review Comment: Make class package-private ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { Review Comment: package-private ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { + /** */ + private static final int NO_SQL_TYPE = -1; + + /** */ + private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new HashMap<>(); + + static { + for (JavaToSqlType type : JavaToSqlType.values()) + JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + * @param stmt {@link PreparedStatement} + * @param idx value index in {@link PreparedStatement} + * @param obj value + */ + public void setEventFieldValue(PreparedStatement stmt, Integer idx, Object obj) { Review Comment: Integer -> int ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -445,25 +353,18 @@ private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) { while (iter.hasNext()) { field = iter.next(); - type = JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(), DFLT_SQL_TYPE); - - sql.append(field.getKey()).append(" ").append(type); precision = entity.getFieldsPrecision().get(field.getKey()); scale = entity.getFieldsScale().get(field.getKey()); - if (precision != null && precision > 0) { - if (SQL_TYPES_WITH_PRECISION_ONLY.contains(type)) - sql.append("(").append(precision).append(")"); - else if (SQL_TYPES_WITH_PRECISION_AND_SCALE.contains(type)) { - sql.append("(").append(precision); + if (precision != null && scale != null) Review Comment: All if-conditions can be encapsulated within the mapper ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { + /** */ + private static final int NO_SQL_TYPE = -1; + + /** */ + private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new HashMap<>(); + + static { + for (JavaToSqlType type : JavaToSqlType.values()) + JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + * @param stmt {@link PreparedStatement} + * @param idx value index in {@link PreparedStatement} + * @param obj value + */ + public void setEventFieldValue(PreparedStatement stmt, Integer idx, Object obj) { + if (obj == null) { + setSafe(() -> stmt.setNull(idx, Types.NULL)); + + return; + } + + int types = JAVA_TO_SQL_TYPE_MAP.get(obj.getClass().getName()).types(); Review Comment: NPE is possible ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { + /** */ + private static final int NO_SQL_TYPE = -1; + + /** */ + private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new HashMap<>(); + + static { + for (JavaToSqlType type : JavaToSqlType.values()) + JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + * @param stmt {@link PreparedStatement} + * @param idx value index in {@link PreparedStatement} + * @param obj value + */ + public void setEventFieldValue(PreparedStatement stmt, Integer idx, Object obj) { + if (obj == null) { + setSafe(() -> stmt.setNull(idx, Types.NULL)); + + return; + } + + int types = JAVA_TO_SQL_TYPE_MAP.get(obj.getClass().getName()).types(); + + if (types != -1) + setSafe(() -> stmt.setObject(idx, obj, types)); + else if (obj instanceof Duration) { + Duration dur = (Duration)obj; + + BigDecimal durVal = BigDecimal.valueOf(dur.getSeconds()).add(BigDecimal.valueOf(dur.getNano(), 9)); + + setSafe(() -> stmt.setBigDecimal(idx, durVal)); + } + else if (obj instanceof byte[]) + setSafe(() -> stmt.setBytes(idx, (byte[])obj)); Review Comment: The types map do have BYTEA type. Can we reach this place? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { + /** */ + private static final int NO_SQL_TYPE = -1; + + /** */ + private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new HashMap<>(); + + static { + for (JavaToSqlType type : JavaToSqlType.values()) + JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + * @param stmt {@link PreparedStatement} + * @param idx value index in {@link PreparedStatement} + * @param obj value + */ + public void setEventFieldValue(PreparedStatement stmt, Integer idx, Object obj) { + if (obj == null) { + setSafe(() -> stmt.setNull(idx, Types.NULL)); + + return; + } + + int types = JAVA_TO_SQL_TYPE_MAP.get(obj.getClass().getName()).types(); + + if (types != -1) + setSafe(() -> stmt.setObject(idx, obj, types)); + else if (obj instanceof Duration) { + Duration dur = (Duration)obj; + + BigDecimal durVal = BigDecimal.valueOf(dur.getSeconds()).add(BigDecimal.valueOf(dur.getNano(), 9)); + + setSafe(() -> stmt.setBigDecimal(idx, durVal)); + } + else if (obj instanceof byte[]) + setSafe(() -> stmt.setBytes(idx, (byte[])obj)); + else + setSafe(() -> stmt.setObject(idx, obj)); + } + + /** + * Renders the SQL type for a given Java class name, including both precision and scale if supported. + * + * @param clsName The fully qualified Java class name (e.g., {@code java.math.BigDecimal}). + * @param precision The numeric precision to include in the SQL type declaration. + * @param scale The numeric scale to include in the SQL type declaration. + * @return A SQL type string (e.g., {@code DECIMAL (10, 2)}) corresponding to the given class and numeric metadata. + * If the SQL type does not support scale, {@link #renderSqlType(String, int)} is used instead. + */ + public String renderSqlType(String clsName, int precision, int scale) { + JavaToSqlType type = JAVA_TO_SQL_TYPE_MAP.get(clsName); + + if (!type.scale()) + return renderSqlType(clsName, precision); + + return type.sqlType().replace("?", String.format("(%d, %d)", precision, scale)); + } + + /** + * Renders the SQL type for a given Java class name, including precision if supported. + * + * @param clsName The fully qualified Java class name (e.g., {@code java.lang.String}). + * @param precision The numeric precision or length to include in the SQL type declaration. + * @return A SQL type string (e.g., {@code VARCHAR (255)}) corresponding to the given class and precision. + * If the SQL type does not support precision, {@link #renderSqlType(String)} is used instead. + */ + public String renderSqlType(String clsName, int precision) { + JavaToSqlType type = JAVA_TO_SQL_TYPE_MAP.get(clsName); + + if (!type.precision()) + return renderSqlType(clsName); + + return type.sqlType().replace("?", String.format("(%d)", precision)); + } + + /** + * Renders the SQL type for a given Java class name without any precision or scale. + * + * @param clsName The fully qualified Java class name (e.g., {@code java.lang.Integer}). + * @return A SQL type string (e.g., {@code INTEGER}, {@code VARCHAR}, or {@code DECIMAL}) with or without + * placeholders removed. If the mapped SQL type includes a precision placeholder, it will be removed. + */ + public String renderSqlType(String clsName) { + JavaToSqlType type = JAVA_TO_SQL_TYPE_MAP.get(clsName); + + if (type.precision()) + return type.sqlType().replace("?", ""); + + return type.sqlType(); + } + + /** + * Executes the given operation that may throw an exception, converting any thrown {@link Throwable} + * into an {@link IgniteException}. + * + * @param op the operation to execute, represented as a {@link RunnableX}. + * @throws IgniteException if the operation throws any exception. + */ + private void setSafe(RunnableX op) { Review Comment: Can be inlined ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java: ########## @@ -0,0 +1,264 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.RunnableX; + +/** */ +public class JavaToSqlTypeMapper { + /** */ + private static final int NO_SQL_TYPE = -1; + + /** */ + private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new HashMap<>(); + + static { + for (JavaToSqlType type : JavaToSqlType.values()) + JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + * @param stmt {@link PreparedStatement} + * @param idx value index in {@link PreparedStatement} + * @param obj value + */ + public void setEventFieldValue(PreparedStatement stmt, Integer idx, Object obj) { + if (obj == null) { + setSafe(() -> stmt.setNull(idx, Types.NULL)); + + return; + } + + int types = JAVA_TO_SQL_TYPE_MAP.get(obj.getClass().getName()).types(); + + if (types != -1) + setSafe(() -> stmt.setObject(idx, obj, types)); + else if (obj instanceof Duration) { + Duration dur = (Duration)obj; Review Comment: Why Duration converts to BigDecimal? ########## modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java: ########## @@ -0,0 +1,357 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.Period; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.junit.Test; + +import static org.apache.ignite.cdc.postgresql.JavaToSqlTypeMapperTest.NumericMeta.NO_NUMERIC_META; +import static org.apache.ignite.cdc.postgresql.JavaToSqlTypeMapperTest.NumericMeta.PRECISION_AND_SCALE; +import static org.apache.ignite.cdc.postgresql.JavaToSqlTypeMapperTest.NumericMeta.PRECISION_ONLY; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class JavaToSqlTypeMapperTest extends CdcPostgreSqlReplicationAbstractTest { + /** */ + private final JavaToSqlTypeMapper javaToSqlTypeMapper = new JavaToSqlTypeMapper(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataRegionConfiguration dataRegionConfiguration = new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setCdcEnabled(true); + + DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration() + .setWalForceArchiveTimeout(5_000) + .setDefaultDataRegionConfiguration(dataRegionConfiguration); + + cfg.setDataStorageConfiguration(dataStorageConfiguration); + cfg.setConsistentId(igniteInstanceName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteToPostgreSqlCdcConsumer getCdcConsumerConfiguration() { + IgniteToPostgreSqlCdcConsumer cdcCfg = super.getCdcConsumerConfiguration(); + + cdcCfg.setCreateTables(true); + + return cdcCfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void javaToPostgreSqlTypesMappingTest() throws Exception { Review Comment: Too long test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org