[GitHub] flink issue #4746: [FLINK-7657] [Table] Adding logic to convert RexLiteral t...

2017-10-10 Thread kmurra
Github user kmurra commented on the issue:

https://github.com/apache/flink/pull/4746
  
The biggest change here is in the test cases -- I generalized the test 
table source to have some basic filtering logic and allow for generic datasets.

I moved the Literal build logic to the 
RexNodeToExpressionConverter.visitLiteral.  I also rewrote several of the 
conversion methods to more closely align with the intended behavior of the code 
- that we're preserving the string values of the various time-related literals 
in the local timezone.  This made a bunch of the epoch-millisecond 
modifications go away.


---


[GitHub] flink issue #4746: [FLINK-7657] [Table] Adding logic to convert RexLiteral t...

2017-10-04 Thread kmurra
Github user kmurra commented on the issue:

https://github.com/apache/flink/pull/4746
  
Regarding the time zones, I think I understand your argument here.  Is 
there anything in particular that you would want me to change overall that you 
haven't already outlined to account for that? I do want to document why we're 
doing any conversions of time zones since it took me some amount of time to 
understand why it was being done (it looked incorrect to myself and several 
other developers on first glance).

Also, I noticed that the Calcite fromCalendarFields simply take the fields 
directly from the Calendar, so making time-zone adjustments are unnecessary 
after I made the changes to toRexNode.  I'll fix that as well for my next 
commit.


---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-10-04 Thread kmurra
Github user kmurra commented on a diff in the pull request:

https://github.com/apache/flink/pull/4746#discussion_r142801455
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
 ---
@@ -49,10 +50,51 @@ object Literal {
 case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
 case sqlTimestamp: Timestamp => Literal(sqlTimestamp, 
SqlTimeTypeInfo.TIMESTAMP)
   }
+
+  private[flink] def apply(rexNode: RexLiteral): Literal = {
+val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)
+
+val literalValue = literalType match {
+  // Chrono use cases.  We're force-adjusting the UTC-based epoch 
timestamps to a new
+  // timestamp such that we get the same year/month/hour/day field 
values in the query's
+  // timezone (UTC)
+  case _...@sqltimetypeinfo.date =>
+val rexValue = rexNode.getValueAs(classOf[DateString])
+val adjustedCal = adjustCalendar(rexValue.toCalendar, 
TimeZone.getDefault)
+new Date(adjustedCal.getTimeInMillis)
--- End diff --

Do you want me to use the deprecated constructor or leave this as-is?


---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-10-04 Thread kmurra
Github user kmurra commented on a diff in the pull request:

https://github.com/apache/flink/pull/4746#discussion_r142801313
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
 ---
@@ -49,10 +50,51 @@ object Literal {
 case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
 case sqlTimestamp: Timestamp => Literal(sqlTimestamp, 
SqlTimeTypeInfo.TIMESTAMP)
   }
+
+  private[flink] def apply(rexNode: RexLiteral): Literal = {
--- End diff --

I'll commit to using your standards for the code-base.  However, allow me 
to voice a disagreement here:

The Literal class does the conversion from the Literal back to the 
RexLiteral.  Having this logic specifically in RexNodeToExpressionConverted 
means the RexLiteral-to-Literal logic is physically split from the 
Literal-to-RexLiteral logic.  This makes it slightly easier for a contributor 
to make a change in one side of the conversion without accounting for the 
other.  In particular, the date adjustments here become harder to understand 
since the context is split between two different files. 



---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-10-04 Thread kmurra
Github user kmurra commented on a diff in the pull request:

https://github.com/apache/flink/pull/4746#discussion_r142800631
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 ---
@@ -200,10 +201,30 @@ case class BatchTableTestUtil() extends TableTestUtil 
{
 printTable(tableEnv.sqlQuery(query))
   }
 
+  def verifyExpressionProjection(fields: Seq[(String, TypeInformation[_])],
--- End diff --

I will move this.


---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-10-04 Thread kmurra
Github user kmurra commented on a diff in the pull request:

https://github.com/apache/flink/pull/4746#discussion_r142800611
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CheckExpressionsTableSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.sources.{BatchTableSource, 
FilterableTableSource, StreamTableSource}
+import org.apache.flink.types.Row
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+/**
+  * A table source that takes in assertions and applies them when 
applyPredicate is called.
+  * Allows for testing that expression push downs are handled properly
+  * @param typeInfo The type info.
+  * @param assertions A set of assertions as a function reference
+  * @param pushedDown Whether this has been pushed down/
+  */
+class CheckExpressionsTableSource(typeInfo: RowTypeInfo,
--- End diff --

I'll look at doing that.  It was my initial approach, but when I saw the 
potential set of test cases that would impact, I decided to do something more 
conservative.  Still do-able.


---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-10-04 Thread kmurra
Github user kmurra commented on a diff in the pull request:

https://github.com/apache/flink/pull/4746#discussion_r142800400
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
 ---
@@ -103,13 +148,21 @@ case class Literal(value: Any, resultType: 
TypeInformation[_]) extends LeafExpre
 }
   }
 
-  private def dateToCalendar: Calendar = {
+  /**
+* Convert a date value to a utc calendar.
+* 
+* We're assuming that when the user passes in a Date its constructed 
from fields,
+* such as days and hours, and that they want those fields to be in the 
same timezone as the
+* Calcite times, which are UTC.  Since we need to convert a Date to a 
Calendar, that means we
+* have to shift the epoch millisecond timestamp to account for the 
difference between UTC and
+* local time.
+* @return Get the Calendar value
+*/
+  private def valueAsUtcCalendar: Calendar = {
 val date = value.asInstanceOf[java.util.Date]
-val cal = Calendar.getInstance(Literal.GMT)
-val t = date.getTime
-// according to Calcite's SqlFunctions.internalToXXX methods
-cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
--- End diff --

The re-implemented adjustCalendar method is functionally the same as this 
when toTz is the UTC TimeZone.  Its just generalized to allow converting 
between arbitrary TimeZones so that I can re-use it in the RexLiteral to 
Expression conversion.


---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-10-04 Thread kmurra
Github user kmurra commented on a diff in the pull request:

https://github.com/apache/flink/pull/4746#discussion_r142799362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
 ---
@@ -49,10 +50,51 @@ object Literal {
 case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
 case sqlTimestamp: Timestamp => Literal(sqlTimestamp, 
SqlTimeTypeInfo.TIMESTAMP)
   }
+
+  private[flink] def apply(rexNode: RexLiteral): Literal = {
+val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)
+
+val literalValue = literalType match {
+  // Chrono use cases.  We're force-adjusting the UTC-based epoch 
timestamps to a new
+  // timestamp such that we get the same year/month/hour/day field 
values in the query's
+  // timezone (UTC)
+  case _...@sqltimetypeinfo.date =>
+val rexValue = rexNode.getValueAs(classOf[DateString])
+val adjustedCal = adjustCalendar(rexValue.toCalendar, 
TimeZone.getDefault)
+new Date(adjustedCal.getTimeInMillis)
--- End diff --

Unfortunately that constructor is deprecated in Java 8, which is why I 
avoided using it.  


---


[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

2017-09-28 Thread kmurra
GitHub user kmurra opened a pull request:

https://github.com/apache/flink/pull/4746

[FLINK-7657] [Table] Adding logic to convert RexLiteral to expected SQL 
Date/Time/Timestamp classes

## What is the purpose of the change

This change fixes handling of date, time, and timestamp literal expressions.

A ClassCastException occurs if you have a date, time, or timestamp literal 
included in the where clause of a SQL query.  When Calcite trees are converted 
to an Expression, Literal values are not properly converted to the expected 
types by the RegNodeToExpressionConverter.

See https://issues.apache.org/jira/browse/FLINK-7657 for more information.

## Brief change log

- Added a method that takes a RexLiteral in the Literal object and applies 
the appropriate transforms to date & time related fields.
- Have a default fallback that works using the old behavior for other field 
types.  
- Note that handling of other types are incorrect per the public 
documentation, but fixes are out of scope for this JIRA
- Added comments that explained handling of values (specifically why we're 
shifting them based on time zones)

## Verifying this change

This change added tests and can be verified as follows:
- Unit test cases were added covering changes
  - Added unit test case to RexProgramExtractorTest.scala to test that 
literal conversions are of the appropriate type.
  - Added unit test case to TableSourceTest.scala to confirm that received 
Expressions in a table source extending from FilterableTableSource do not cause 
exceptions and have expected values
- Note: Other unit test cases cover possible regressions in output values 
from the Literal->RexNode conversion

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kmurra/flink kmurra-expression-timestamp-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4746


commit d8477977694614ed51ad53de55104df10ac55ad4
Author: Kent Murra <ke...@remitly.com>
Date:   2017-09-27T20:48:55Z

[FLINK-7657] Adding logic to convert RexLiteral to expected SQL 
Date/Time/Timestamp classes, preventing ClassCastException




---