This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 88d4712  [BEAM-12385] Handle VARCHAR and other SQL specific logical 
types in AvroUtils
     new 62e8f84  Merge pull request #14858: [BEAM-12385] Handle VARCHAR and 
other SQL specific logical types in AvroUtils
88d4712 is described below

commit 88d4712147911744761cb385b8226c81e283d1fe
Author: Anant Damle <anantda...@gmail.com>
AuthorDate: Fri May 21 23:47:53 2021 +0800

    [BEAM-12385] Handle VARCHAR and other SQL specific logical types in 
AvroUtils
---
 CHANGES.md                                         |   3 +
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   |  43 ++++
 .../beam/sdk/schemas/utils/AvroUtilsTest.java      | 244 +++++++++++++++++++++
 .../apache/beam/sdk/io/jdbc/SchemaUtilTest.java    |  60 +++++
 4 files changed, 350 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index 7029cd2..62b8e71 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,9 @@
 * `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and 
`AGGREGATE` are now reserved keywords. 
([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)).
 * Flink 1.13 is now supported by the Flink runner 
([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)).
 * X feature added (Java/Python) 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes:
+  `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME`
+  (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).
 
 ## Breaking Changes
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 77b5445..0835f9b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -906,6 +906,26 @@ public class AvroUtils {
                         .map(x -> getFieldSchema(x.getType(), x.getName(), 
namespace))
                         .collect(Collectors.toList()));
             break;
+          case "CHAR":
+          case "NCHAR":
+            baseType =
+                buildHiveLogicalTypeSchema("char", (int) 
fieldType.getLogicalType().getArgument());
+            break;
+          case "NVARCHAR":
+          case "VARCHAR":
+          case "LONGNVARCHAR":
+          case "LONGVARCHAR":
+            baseType =
+                buildHiveLogicalTypeSchema(
+                    "varchar", (int) fieldType.getLogicalType().getArgument());
+            break;
+          case "DATE":
+            baseType = 
LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
+            break;
+          case "TIME":
+            baseType =
+                
LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
+            break;
           default:
             throw new RuntimeException(
                 "Unhandled logical type " + 
fieldType.getLogicalType().getIdentifier());
@@ -1017,6 +1037,15 @@ public class AvroUtils {
                   
typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()),
                   oneOfValue.getValue());
             }
+          case "NVARCHAR":
+          case "VARCHAR":
+          case "LONGNVARCHAR":
+          case "LONGVARCHAR":
+            return new Utf8((String) value);
+          case "DATE":
+            return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
+          case "TIME":
+            return (int) ((Instant) value).getMillis();
           default:
             throw new RuntimeException(
                 "Unhandled logical type " + 
fieldType.getLogicalType().getIdentifier());
@@ -1277,4 +1306,18 @@ public class AvroUtils {
     checkArgument(
         got.equals(expected), "Can't convert '%s' to %s, expected: %s", label, 
got, expected);
   }
+
+  /**
+   * Helper factory to build Avro Logical types schemas for SQL *CHAR types. 
This method <a
+   * 
href="https://github.com/apache/hive/blob/5d268834a5f5278ea76399f8af0d0ab043ae0b45/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java#L110-L121";>represents
+   * the logical as Hive does</a>.
+   */
+  private static org.apache.avro.Schema buildHiveLogicalTypeSchema(
+      String hiveLogicalType, int size) {
+    String schemaJson =
+        String.format(
+            "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": 
%s}",
+            hiveLogicalType, size);
+    return new org.apache.avro.Schema.Parser().parse(schemaJson);
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index 627d555..c1096ca 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -26,6 +26,7 @@ import com.pholser.junit.quickcheck.Property;
 import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.JDBCType;
 import java.util.List;
 import java.util.Map;
 import org.apache.avro.Conversions;
@@ -57,8 +58,13 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -551,6 +557,164 @@ public class AvroUtilsTest {
   }
 
   @Test
+  public void testJdbcLogicalVarCharRowDataToAvroSchema() {
+    String expectedAvroSchemaJson =
+        "{ "
+            + " \"name\": \"topLevelRecord\", "
+            + " \"type\": \"record\", "
+            + " \"fields\": [{ "
+            + "   \"name\": \"my_varchar_field\", "
+            + "   \"type\": {\"type\": \"string\", \"logicalType\": 
\"varchar\", \"maxLength\": 10}"
+            + "  }, "
+            + "  { "
+            + "   \"name\": \"my_longvarchar_field\", "
+            + "   \"type\": {\"type\": \"string\", \"logicalType\": 
\"varchar\", \"maxLength\": 50}"
+            + "  }, "
+            + "  { "
+            + "   \"name\": \"my_nvarchar_field\", "
+            + "   \"type\": {\"type\": \"string\", \"logicalType\": 
\"varchar\", \"maxLength\": 10}"
+            + "  }, "
+            + "  { "
+            + "   \"name\": \"my_longnvarchar_field\", "
+            + "   \"type\": {\"type\": \"string\", \"logicalType\": 
\"varchar\", \"maxLength\": 50}"
+            + "  }, "
+            + "  { "
+            + "   \"name\": \"fixed_length_char_field\", "
+            + "   \"type\": {\"type\": \"string\", \"logicalType\": \"char\", 
\"maxLength\": 25}"
+            + "  } "
+            + " ] "
+            + "}";
+
+    Schema beamSchema =
+        Schema.builder()
+            .addField(
+                Field.of(
+                    "my_varchar_field", 
FieldType.logicalType(JdbcType.StringType.varchar(10))))
+            .addField(
+                Field.of(
+                    "my_longvarchar_field",
+                    
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
+            .addField(
+                Field.of(
+                    "my_nvarchar_field", 
FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
+            .addField(
+                Field.of(
+                    "my_longnvarchar_field",
+                    
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
+            .addField(
+                Field.of(
+                    "fixed_length_char_field",
+                    
FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25))))
+            .build();
+
+    assertEquals(
+        new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
+        AvroUtils.toAvroSchema(beamSchema));
+  }
+
+  @Test
+  public void testJdbcLogicalVarCharRowDataToGenericRecord() {
+    Schema beamSchema =
+        Schema.builder()
+            .addField(
+                Field.of(
+                    "my_varchar_field", 
FieldType.logicalType(JdbcType.StringType.varchar(10))))
+            .addField(
+                Field.of(
+                    "my_longvarchar_field",
+                    
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
+            .addField(
+                Field.of(
+                    "my_nvarchar_field", 
FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
+            .addField(
+                Field.of(
+                    "my_longnvarchar_field",
+                    
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
+            .build();
+
+    Row rowData =
+        Row.withSchema(beamSchema)
+            .addValue("varchar_value")
+            .addValue("longvarchar_value")
+            .addValue("nvarchar_value")
+            .addValue("longnvarchar_value")
+            .build();
+
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+    GenericRecord expectedRecord =
+        new GenericRecordBuilder(avroSchema)
+            .set("my_varchar_field", "varchar_value")
+            .set("my_longvarchar_field", "longvarchar_value")
+            .set("my_nvarchar_field", "nvarchar_value")
+            .set("my_longnvarchar_field", "longnvarchar_value")
+            .build();
+
+    assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, 
avroSchema));
+  }
+
+  @Test
+  public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() {
+    String expectedAvroSchemaJson =
+        "{ "
+            + " \"name\": \"topLevelRecord\", "
+            + " \"type\": \"record\", "
+            + " \"fields\": [{ "
+            + "   \"name\": \"my_date_field\", "
+            + "   \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }"
+            + "  }, "
+            + "  { "
+            + "   \"name\": \"my_time_field\", "
+            + "   \"type\": { \"type\": \"int\", \"logicalType\": 
\"time-millis\" }"
+            + "  }"
+            + " ] "
+            + "}";
+
+    Schema beamSchema =
+        Schema.builder()
+            .addField(Field.of("my_date_field", 
FieldType.logicalType(JdbcType.DATE)))
+            .addField(Field.of("my_time_field", 
FieldType.logicalType(JdbcType.TIME)))
+            .build();
+
+    assertEquals(
+        new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
+        AvroUtils.toAvroSchema(beamSchema));
+  }
+
+  @Test
+  public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
+    // Test Fixed clock at
+    DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z");
+
+    Schema beamSchema =
+        Schema.builder()
+            .addField(Field.of("my_date_field", 
FieldType.logicalType(JdbcType.DATE)))
+            .addField(Field.of("my_time_field", 
FieldType.logicalType(JdbcType.TIME)))
+            .build();
+
+    Row rowData =
+        Row.withSchema(beamSchema)
+            
.addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
+            
.addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get()))
+            .build();
+
+    int daysFromEpoch =
+        Days.daysBetween(
+                Instant.EPOCH,
+                
testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
+            .getDays();
+    int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay();
+
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+    GenericRecord expectedRecord =
+        new GenericRecordBuilder(avroSchema)
+            .set("my_date_field", daysFromEpoch)
+            .set("my_time_field", timeSinceMidNight)
+            .build();
+
+    assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, 
avroSchema));
+  }
+
+  @Test
   public void testBeamRowToGenericRecord() {
     GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), 
null);
     assertEquals(getAvroSchema(), genericRecord.getSchema());
@@ -640,4 +804,84 @@ public class AvroUtilsTest {
         AvroUtils.getFromRowFunction(GenericRecord.class),
         AvroUtils.getFromRowFunction(GenericRecord.class));
   }
+
+  /** Helper class that simulate JDBC Logical types. */
+  private static class JdbcType<T> implements Schema.LogicalType<T, T> {
+
+    private static final JdbcType<Instant> DATE =
+        new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME, 
"");
+    private static final JdbcType<Instant> TIME =
+        new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME, 
"");
+
+    private final String identifier;
+    private final FieldType argumentType;
+    private final FieldType baseType;
+    private final Object argument;
+
+    private static class StringType extends JdbcType<String> {
+
+      private static StringType fixedLengthChar(int size) {
+        return new StringType(JDBCType.CHAR, size);
+      }
+
+      private static StringType varchar(int size) {
+        return new StringType(JDBCType.VARCHAR, size);
+      }
+
+      private static StringType longvarchar(int size) {
+        return new StringType(JDBCType.LONGVARCHAR, size);
+      }
+
+      private static StringType nvarchar(int size) {
+        return new StringType(JDBCType.NVARCHAR, size);
+      }
+
+      private static StringType longnvarchar(int size) {
+        return new StringType(JDBCType.LONGNVARCHAR, size);
+      }
+
+      private StringType(JDBCType type, int size) {
+        super(type, FieldType.INT32, FieldType.STRING, size);
+      }
+    }
+
+    private JdbcType(
+        JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object 
argument) {
+      this.identifier = jdbcType.getName();
+      this.argumentType = argumentType;
+      this.baseType = baseType;
+      this.argument = argument;
+    }
+
+    @Override
+    public String getIdentifier() {
+      return identifier;
+    }
+
+    @Override
+    public @Nullable FieldType getArgumentType() {
+      return argumentType;
+    }
+
+    @Override
+    public FieldType getBaseType() {
+      return baseType;
+    }
+
+    @Override
+    @SuppressWarnings("TypeParameterUnusedInFormals")
+    public <T1> @Nullable T1 getArgument() {
+      return (T1) argument;
+    }
+
+    @Override
+    public @NonNull T toBaseType(@NonNull T input) {
+      return input;
+    }
+
+    @Override
+    public @NonNull T toInputType(@NonNull T base) {
+      return base;
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
index 7ec9e7b..18acf04 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
@@ -37,6 +37,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.DateTime;
@@ -224,6 +225,65 @@ public class SchemaUtilTest {
   }
 
   @Test
+  public void testJdbcLogicalTypesMapValidAvroSchemaIT() {
+    String expectedAvroSchema =
+        "{"
+            + " \"type\": \"record\","
+            + " \"name\": \"topLevelRecord\","
+            + " \"fields\": [{"
+            + "  \"name\": \"longvarchar_col\","
+            + "  \"type\": {"
+            + "   \"type\": \"string\","
+            + "   \"logicalType\": \"varchar\","
+            + "   \"maxLength\": 50"
+            + "  }"
+            + " }, {"
+            + "  \"name\": \"varchar_col\","
+            + "  \"type\": {"
+            + "   \"type\": \"string\","
+            + "   \"logicalType\": \"varchar\","
+            + "   \"maxLength\": 15"
+            + "  }"
+            + " }, {"
+            + "  \"name\": \"fixedlength_char_col\","
+            + "  \"type\": {"
+            + "   \"type\": \"string\","
+            + "   \"logicalType\": \"char\","
+            + "   \"maxLength\": 25"
+            + "  }"
+            + " }, {"
+            + "  \"name\": \"date_col\","
+            + "  \"type\": {"
+            + "   \"type\": \"int\","
+            + "   \"logicalType\": \"date\""
+            + "  }"
+            + " }, {"
+            + "  \"name\": \"time_col\","
+            + "  \"type\": {"
+            + "   \"type\": \"int\","
+            + "   \"logicalType\": \"time-millis\""
+            + "  }"
+            + " }]"
+            + "}";
+
+    Schema jdbcRowSchema =
+        Schema.builder()
+            .addField(
+                "longvarchar_col", 
LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50))
+            .addField("varchar_col", 
LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15))
+            .addField("fixedlength_char_col", 
LogicalTypes.fixedLengthString(JDBCType.CHAR, 25))
+            .addField("date_col", LogicalTypes.JDBC_DATE_TYPE)
+            .addField("time_col", LogicalTypes.JDBC_TIME_TYPE)
+            .build();
+
+    System.out.println(AvroUtils.toAvroSchema(jdbcRowSchema));
+
+    assertEquals(
+        new org.apache.avro.Schema.Parser().parse(expectedAvroSchema),
+        AvroUtils.toAvroSchema(jdbcRowSchema));
+  }
+
+  @Test
   public void testBeamRowMapperDateTime() throws Exception {
     long epochMilli = 1558719710000L;
 

Reply via email to