[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-06 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r466175841



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   It's wired that the override `toString()` has never been called.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r466111720



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   We could not create a class extending `SqlTimeStampLiteral` because its 
constructor is private.
   
https://github.com/apache/calcite/blob/2088488ac8327b19512a76a122cae2961fc551c3/core/src/main/java/org/apache/calcite/sql/SqlTimestampLiteral.java#L34





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465494375



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   Ack. It sounds great. I will try it this way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465494480



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
##
@@ -253,6 +259,11 @@ private void unparseTrim(SqlWriter writer, SqlCall call, 
int leftPrec, int right
 writer.endFunCall(trimFrame);
   }
 
+  private void unparseDateTimeLiteralWrapperFunction(
+  SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+writer.literal(call.operand(0).toString().replace("TIMESTAMP", 
"DATETIME"));

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465493939



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##
@@ -442,6 +455,20 @@ private static Expression value(Expression value, 
Schema.FieldType type) {
   value, Expressions.divide(value, 
Expressions.constant(NANOS_PER_MILLISECOND)));
 } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
   return value;
+} else if (SqlTypes.DATETIME.getIdentifier().equals(logicalId)) {
+  Expression dateValue =
+  Expressions.call(value, "getValue", 
Expressions.constant(DateTime.DATE_FIELD_NAME));
+  Expression timeValue =
+  Expressions.call(value, "getValue", 
Expressions.constant(DateTime.TIME_FIELD_NAME));
+  Expression returnValue =
+  Expressions.add(
+  Expressions.multiply(
+  Types.castIfNecessary(long.class, dateValue),

Review comment:
   Done. It's much better now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465493732



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ *
+ * Its input type is a {@link LocalDateTime}, and base type is a {@link 
Row} containing Date
+ * field and Time field. Date field is the same as the base type of {@link 
Date}, which is a Long
+ * that represents incrementing count of days where day 0 is 1970-01-01 (ISO). 
Time field is the
+ * same as the base type of {@link Time}, which is a Long that represents a 
count of time in
+ * nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType {
+  public static final String DATE_FIELD_NAME = "Date";
+  public static final String TIME_FIELD_NAME = "Time";
+  public static final Schema DATETIME_SCHEMA =
+  
Schema.builder().addInt64Field(DATE_FIELD_NAME).addInt64Field(TIME_FIELD_NAME).build();
+
+  @Override
+  public String getIdentifier() {
+return "beam:logical_type:datetime:v1";
+  }
+
+  // unused
+  @Override
+  public Schema.FieldType getArgumentType() {
+return Schema.FieldType.STRING;
+  }
+
+  // unused
+  @Override
+  public String getArgument() {
+return "";
+  }
+
+  @Override
+  public Schema.FieldType getBaseType() {
+return Schema.FieldType.row(DATETIME_SCHEMA);
+  }
+
+  @Override
+  public Row toBaseType(LocalDateTime input) {
+return input == null
+? null
+: Row.withSchema(DATETIME_SCHEMA)
+.addValues(input.toLocalDate().toEpochDay(), 
input.toLocalTime().toNanoOfDay())
+.build();
+  }
+
+  @Override
+  public LocalDateTime toInputType(Row base) {
+return base == null
+? null
+: LocalDateTime.of(
+LocalDate.ofEpochDay(base.getValue(DATE_FIELD_NAME)),

Review comment:
   That's a great idea. It can also avoid type casting in 
`BeamCalcRel.java`. Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465319230



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ *
+ * Its input type is a {@link LocalDateTime}, and base type is a {@link 
Row} containing Date
+ * field and Time field. Date field is the same as the base type of {@link 
Date}, which is a Long
+ * that represents incrementing count of days where day 0 is 1970-01-01 (ISO). 
Time field is the
+ * same as the base type of {@link Time}, which is a Long that represents a 
count of time in
+ * nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType {
+  public static final String DATE_FIELD = "Date";

Review comment:
   Done.

##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
##
@@ -51,6 +52,7 @@ public void encodeAndDecode() throws Exception {
 .add("col_string_varchar", SqlTypeName.VARCHAR)
 .add("col_time", SqlTypeName.TIME)
 .add("col_date", SqlTypeName.DATE)
+.add("col_datetime", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)

Review comment:
   OK. Done.

##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
 .addNullableField("year_with_null", FieldType.INT64)
 .addField("mm", FieldType.INT64)
 .addNullableField("month_with_null", FieldType.INT64)
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+Row dateRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(dateRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select EXTRACT(DAY from dateTypeField) as dd, "
++ " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
++ " dateTypeField + interval '1' day as 
date_with_day_added, "
++ " nullableDateTypeField + interval '1' day as 
day_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
+.addField("dd", FieldType.INT64)
+.addNullableField("day_with_null", FieldType.INT64)
+.addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {

Review comment:
   Done.





This is an 

[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465319135



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##
@@ -184,6 +188,19 @@ private static Value 
beamLogicalObjectToZetaSqlValue(Object object, String ident
   } else { // input type
 return 
Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) 
object));
   }
+} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+  // DateTime value
+  LocalDateTime datetime;
+  if (object instanceof Row) { // base type
+datetime =
+LocalDateTime.of(
+LocalDate.ofEpochDay(((Row) object).getValue("Date")),
+LocalTime.ofNanoOfDay(((Row) object).getValue("Time")));
+  } else { // input type
+datetime = (LocalDateTime) object;
+  }
+  return Value.createDatetimeValue(

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465309197



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
 .addNullableField("year_with_null", FieldType.INT64)
 .addField("mm", FieldType.INT64)
 .addNullableField("month_with_null", FieldType.INT64)
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+Row dateRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(dateRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select EXTRACT(DAY from dateTypeField) as dd, "
++ " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
++ " dateTypeField + interval '1' day as 
date_with_day_added, "
++ " nullableDateTypeField + interval '1' day as 
day_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
+.addField("dd", FieldType.INT64)
+.addNullableField("day_with_null", FieldType.INT64)
+.addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+.addNullableField("nullableTimeTypeField", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+Row timeRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), 
null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(timeRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select timeTypeField + interval '1' hour as 
time_with_hour_added, "
++ " nullableTimeTypeField + interval '1' hour as 
hour_added_with_null, "
++ " timeTypeField - INTERVAL '60' SECOND as 
time_with_seconds_added, "
++ " nullableTimeTypeField - INTERVAL '60' SECOND as 
seconds_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
 .addField("time_with_hour_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("hour_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
 .addField("time_with_seconds_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("seconds_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 
0), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTimeField", 
FieldType.logicalType(SqlTypes.DATETIME))

Review comment:
   Actually is beam type 
`FieldType.logicalType(sqlTypes.DATETIME).withNullable(true)` could not be 
recognized.
   There is no label for that. Do we need to create it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465309197



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
 .addNullableField("year_with_null", FieldType.INT64)
 .addField("mm", FieldType.INT64)
 .addNullableField("month_with_null", FieldType.INT64)
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+Row dateRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(dateRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select EXTRACT(DAY from dateTypeField) as dd, "
++ " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
++ " dateTypeField + interval '1' day as 
date_with_day_added, "
++ " nullableDateTypeField + interval '1' day as 
day_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
+.addField("dd", FieldType.INT64)
+.addNullableField("day_with_null", FieldType.INT64)
+.addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+.addNullableField("nullableTimeTypeField", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+Row timeRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), 
null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(timeRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select timeTypeField + interval '1' hour as 
time_with_hour_added, "
++ " nullableTimeTypeField + interval '1' hour as 
hour_added_with_null, "
++ " timeTypeField - INTERVAL '60' SECOND as 
time_with_seconds_added, "
++ " nullableTimeTypeField - INTERVAL '60' SECOND as 
seconds_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
 .addField("time_with_hour_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("hour_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
 .addField("time_with_seconds_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("seconds_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 
0), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTimeField", 
FieldType.logicalType(SqlTypes.DATETIME))

Review comment:
   Actually is beam type 
`FieldType.logicalType(sqlTypes.DATETIME).withNullable(true)` could not be 
recognized.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465308051



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
 .addNullableField("year_with_null", FieldType.INT64)
 .addField("mm", FieldType.INT64)
 .addNullableField("month_with_null", FieldType.INT64)
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+Row dateRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(dateRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select EXTRACT(DAY from dateTypeField) as dd, "
++ " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
++ " dateTypeField + interval '1' day as 
date_with_day_added, "
++ " nullableDateTypeField + interval '1' day as 
day_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
+.addField("dd", FieldType.INT64)
+.addNullableField("day_with_null", FieldType.INT64)
+.addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+.addNullableField("nullableTimeTypeField", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+Row timeRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), 
null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(timeRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select timeTypeField + interval '1' hour as 
time_with_hour_added, "
++ " nullableTimeTypeField + interval '1' hour as 
hour_added_with_null, "
++ " timeTypeField - INTERVAL '60' SECOND as 
time_with_seconds_added, "
++ " nullableTimeTypeField - INTERVAL '60' SECOND as 
seconds_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
 .addField("time_with_hour_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("hour_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
 .addField("time_with_seconds_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("seconds_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 
0), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTimeField", 
FieldType.logicalType(SqlTypes.DATETIME))

Review comment:
   No. I think its because `NULLABLE_DATE` could be recognized but 
`NULLABLE_TIMESTAMP_WITH_LOCAL_TZ` could not. 
(https://github.com/apache/beam/blob/5e0e798ddd827fd212ac89b8c6f6f2cf9e4b29a5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java#L115-L125)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465268349



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
 .addNullableField("year_with_null", FieldType.INT64)
 .addField("mm", FieldType.INT64)
 .addNullableField("month_with_null", FieldType.INT64)
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+Row dateRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(dateRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select EXTRACT(DAY from dateTypeField) as dd, "
++ " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
++ " dateTypeField + interval '1' day as 
date_with_day_added, "
++ " nullableDateTypeField + interval '1' day as 
day_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
+.addField("dd", FieldType.INT64)
+.addNullableField("day_with_null", FieldType.INT64)
+.addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+.addNullableField("nullableTimeTypeField", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+Row timeRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), 
null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(timeRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select timeTypeField + interval '1' hour as 
time_with_hour_added, "
++ " nullableTimeTypeField + interval '1' hour as 
hour_added_with_null, "
++ " timeTypeField - INTERVAL '60' SECOND as 
time_with_seconds_added, "
++ " nullableTimeTypeField - INTERVAL '60' SECOND as 
seconds_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
 .addField("time_with_hour_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("hour_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
 .addField("time_with_seconds_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("seconds_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 
0), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTimeField", 
FieldType.logicalType(SqlTypes.DATETIME))

Review comment:
   I planned to do that but Calcite do not have 
`NULLABLE_TIMESTAMP_WITH_LOCAL_TZ` type corresponding to 
`TIMESTAMP_WITH_LOCAL_TZ`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-04 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465268349



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
 .addNullableField("year_with_null", FieldType.INT64)
 .addField("mm", FieldType.INT64)
 .addNullableField("month_with_null", FieldType.INT64)
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+Row dateRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(dateRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select EXTRACT(DAY from dateTypeField) as dd, "
++ " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
++ " dateTypeField + interval '1' day as 
date_with_day_added, "
++ " nullableDateTypeField + interval '1' day as 
day_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
+.addField("dd", FieldType.INT64)
+.addNullableField("day_with_null", FieldType.INT64)
+.addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+.addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+.addNullableField("nullableTimeTypeField", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+Row timeRow =
+Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), 
null).build();
+
+PCollection outputRow =
+pipeline
+.apply(Create.of(timeRow))
+.setRowSchema(dateTimeFieldSchema)
+.apply(
+SqlTransform.query(
+"select timeTypeField + interval '1' hour as 
time_with_hour_added, "
++ " nullableTimeTypeField + interval '1' hour as 
hour_added_with_null, "
++ " timeTypeField - INTERVAL '60' SECOND as 
time_with_seconds_added, "
++ " nullableTimeTypeField - INTERVAL '60' SECOND as 
seconds_added_with_null "
++ " from PCOLLECTION"));
+
+Schema outputRowSchema =
+Schema.builder()
 .addField("time_with_hour_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("hour_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
 .addField("time_with_seconds_added", 
FieldType.logicalType(SqlTypes.TIME))
 .addNullableField("seconds_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
+.build();
+
+PAssert.that(outputRow)
+.containsInAnyOrder(
+Row.withSchema(outputRowSchema)
+.addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 
0), null)
+.build());
+
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+Schema dateTimeFieldSchema =
+Schema.builder()
+.addField("dateTimeField", 
FieldType.logicalType(SqlTypes.DATETIME))

Review comment:
   I planned to do that but Beam do not have 
`NULLABLE_TIMESTAMP_WITH_LOCAL_TZ` type corresponding to 
`TIMESTAMP_WITH_LOCAL_TZ`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-07-30 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r463375064



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##
@@ -184,6 +188,19 @@ private static Value 
beamLogicalObjectToZetaSqlValue(Object object, String ident
   } else { // input type
 return 
Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) 
object));
   }
+} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+  // DateTime value
+  LocalDateTime datetime;
+  if (object instanceof Row) { // base type
+datetime =
+LocalDateTime.of(
+LocalDate.ofEpochDay(((Row) object).getValue("Date")),
+LocalTime.ofNanoOfDay(((Row) object).getValue("Time")));
+  } else { // input type
+datetime = (LocalDateTime) object;
+  }
+  return Value.createDatetimeValue(

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-07-30 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r462561296



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ *
+ * Its input type is a {@link LocalDateTime}, and base type is a {@link 
Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing 
count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time 
in nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType {
+  private final Schema schema =

Review comment:
   Thanks. Done.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ *
+ * Its input type is a {@link LocalDateTime}, and base type is a {@link 
Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing 
count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time 
in nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType {
+  private final Schema schema =
+  Schema.builder().addInt64Field("Date").addInt64Field("Time").build();

Review comment:
   Done.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *