Repository: storm
Updated Branches:
  refs/heads/master 502dca194 -> 6162ccdba


http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index f2ac081..19369b4 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.sql.compiler;
 
+import com.google.common.base.Function;
+import org.apache.calcite.avatica.util.ByteString;
 import org.apache.storm.tuple.Values;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
@@ -30,12 +32,17 @@ import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestExprSemantic {
   private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
@@ -60,6 +67,77 @@ public class TestExprSemantic {
   }
 
   @Test
+  public void testDistinctBetweenLikeSimilarIn() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList("TRUE IS DISTINCT FROM TRUE",
+                    "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5",
+                    "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'",
+                    "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO 
'[a-zA-Z]+[cd]{1}'",
+                    "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN ('1', 
'2', '3', '4')",
+                    "2 NOT IN (1, 3, 5)"));
+    assertEquals(new Values(false, false, true, true, true,
+          false, true, true, true, true), v);
+  }
+
+  @Test
+  public void testCaseStatement() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN UPPER('a') 
" +
+                    "WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn 
CONCAT('abcd', '#')} END",
+                    "CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN UPPER('a') " 
+
+                    "WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn CONCAT('ab', 
'#')} END",
+                    "CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN UPPER('a') 
" +
+                    "WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn CONCAT('abc', 
'#')} END"
+                    )
+    );
+
+    // TODO: The data type of literal Calcite assigns seems to be out of 
expectation. Please see below logical plan.
+    // LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), 
=('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER SET "ISO-8859-1" 
COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), 
CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL), 'b', CAST(||('abcd', '#')):VARCHAR(5) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], 
EXPR$1=[CASE(OR(=('ab', 'a'), =('ab', 'abc'), =('ab', 'abcde')), 
CAST(UPPER('a')):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('ab'), 'AB'), CAST('b'):CHAR(3) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, 
||('ab', '#'))], EXPR$2=[CASE(OR(=('abc', 'a'), =('abc', 'abc'), =('abc', 
'abcde')), CAST(UPPER('a')):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abc'), CAST('AB'):CHAR(3) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 
CAST('b'):CHAR(4) C
 HARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, 
||('abc', '#'))]): rowcount = 1.0, cumulative cost = {2.0 rows, 5.0 cpu, 0.0 
io}, id = 5
+    //   LogicalFilter(condition=[AND(>($0, 0), <($0, 2))]): rowcount = 1.0, 
cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 4
+    //     EnumerableTableScan(table=[[FOO]]): rowcount = 1.0, cumulative cost 
= {0.0 rows, 1.0 cpu, 0.0 io}, id = 3
+    // in result, both 'b' and UPPER('a') hence 'A' are having some spaces 
which is not expected.
+    // When we use CASE with actual column (Java String type hence VARCHAR), 
it seems to work as expected.
+    // Please refer trident/TestPlanCompiler#testCaseStatement(), and see 
below logical plan.
+    // LogicalProject(EXPR$0=[CASE(OR(=($1, 'a'), =($1, 'abc'), =($1, 
'abcde')), CAST(UPPER('a')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary", =(CAST(UPPER($1)):VARCHAR(2) CHARACTER SET 
"ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", 'AB'), 'b', CAST(||($1, 
'#')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary")]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 
cpu, 0.0 io}, id = 3
+    List<Object> v2 = Lists.transform(v, new Function<Object, Object>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable Object o) {
+        return ((String) o).trim();
+      }
+    });
+    assertArrayEquals(new Values("abcd#", "b", "A").toArray(), v2.toArray());
+  }
+
+  @Test
+  public void testNullIfAndCoalesce() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "NULLIF(5, 5)", "NULLIF(5, 0)", "COALESCE(NULL, NULL, 5, 
4, NULL)", "COALESCE(1, 5)"
+            ));
+    assertEquals(new Values(null, 5, 5, 1), v);
+  }
+
+  @Test
+  public void testCollectionFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "ELEMENT(ARRAY[3])", "CARDINALITY(ARRAY[1, 2, 3, 4, 5])"
+            ));
+    assertEquals(new Values(3, 5), v);
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testElementFunctionMoreThanOneValue() throws Exception {
+    testExpr(
+            Lists.newArrayList(
+                    "ELEMENT(ARRAY[1, 2, 3])"
+            ));
+    fail("ELEMENT with array which has multiple elements should throw 
exception in runtime.");
+  }
+
+  @Test
   public void testArithmeticWithNull() throws Exception {
     Values v = testExpr(
       Lists.newArrayList(
@@ -131,14 +209,188 @@ public class TestExprSemantic {
   }
 
   @Test
-  public void testStringMethods() throws Exception {
+  public void testArithmeticFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "POWER(3, 2)", "ABS(-10)", "MOD(10, 3)", "MOD(-10, 3)",
+                    "CEIL(123.45)", "FLOOR(123.45)"
+            ));
+
+    assertEquals(new Values(9.0d, 10, 1, -1, new BigDecimal(124), new 
BigDecimal(123)), v);
+
+    // Belows are floating numbers so comparing this with literal is tend to 
be failing...
+    // Picking int value and compare
+    Values v2 = testExpr(
+            Lists.newArrayList(
+                    "SQRT(255)", "LN(16)", "LOG10(10000)", "EXP(10)"
+            ));
+    List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
+      @Nullable
+      @Override
+      public Object apply(@Nullable Object o) {
+        // only takes int value
+        return ((Number) o).intValue();
+      }
+    });
+
+    // 15.9687, 2.7725, 4.0, 22026.465794
+    assertEquals(new Values(15, 2, 4, 22026), v2m);
+  }
+
+  @Test
+  public void testStringFunctions() throws Exception {
     Values v = testExpr(
         Lists.newArrayList(
-            "UPPER('a')", "LOWER('A')", "INITCAP('foo')",
-            "SUBSTRING('foo', 2)", "CHARACTER_LENGTH('foo')", 
"CHAR_LENGTH('foo')",
-            "'ab' || 'cd'"
+                "'ab' || 'cd'", "CHAR_LENGTH('foo')", 
"CHARACTER_LENGTH('foo')",
+                "UPPER('a')", "LOWER('A')", "POSITION('bc' IN 'abcd')",
+                "TRIM(BOTH ' ' FROM '  abcdeabcdeabc  ')",
+                "TRIM(LEADING ' ' FROM '  abcdeabcdeabc  ')",
+                "TRIM(TRAILING ' ' FROM '  abcdeabcdeabc  ')",
+                "OVERLAY('abcde' PLACING 'bc' FROM 3)",
+                "SUBSTRING('abcde' FROM 3)", "SUBSTRING('abcdeabcde' FROM 3 
FOR 4)",
+                "INITCAP('foo')"
         ));
-    assertEquals(new Values("A", "a", "Foo", "oo", 3, 3, "abcd"), v);
+    assertEquals(new Values("abcd", 3, 3, "A", "a", 2, "abcdeabcdeabc", 
"abcdeabcdeabc  ", "  abcdeabcdeabc", "abbce", "cde", "cdea", "Foo"), v);
+  }
+
+  @Test
+  public void testBinaryStringFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "x'45F0AB' || x'45F0AB'",
+                    "POSITION(x'F0' IN x'453423F0ABBC')",
+                    "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3)"
+                    // "SUBSTRING(x'453423F0ABBC' FROM 3)",
+                    // "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4)"
+            ));
+
+    // TODO: Calcite 1.9.0 has bugs on binary SUBSTRING functions
+    // as there's no 
SqlFunctions.substring(org.apache.calcite.avatica.util.ByteString, ...)
+    // commented out testing substring function
+
+    assertEquals("45f0ab45f0ab", v.get(0).toString());
+    assertEquals(4, v.get(1));
+    assertEquals("45344534abbc45", v.get(2).toString());
+    // assertEquals("23f0abbc", v.get(3).toString());
+    // assertEquals("23f0ab", v.get(4).toString());
+  }
+
+  @Test
+  public void testDateAndTimestampLiteral() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "DATE '1970-05-15' AS datefield",
+                    "TIME '00:00:00' AS timefield",
+                    "TIMESTAMP '2016-01-01 00:00:00' as timestampfield"
+            )
+    );
+
+    assertEquals(3, v.size());
+    assertEquals(134, v.get(0));
+    assertEquals(0, v.get(1));
+    assertEquals(1451606400000L, v.get(2));
+  }
+
+  @Test
+  public void testInterval() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "INTERVAL '1-5' YEAR TO MONTH AS intervalfield",
+                    "(DATE '1970-01-01', DATE '1970-01-15') AS 
anchoredinterval_field"
+            )
+    );
+
+    assertEquals(3, v.size());
+    assertEquals(17, v.get(0));
+    assertEquals(0, v.get(1));
+    assertEquals(14, v.get(2));
+  }
+
+  @Test
+  public void testDateFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "LOCALTIME = CURRENT_TIME, LOCALTIMESTAMP = 
CURRENT_TIMESTAMP, CURRENT_DATE",
+                    "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')",
+                    "FLOOR(DATE '2016-01-23' TO MONTH)",
+                    "CEIL(TIME '12:34:56' TO MINUTE)"
+            )
+    );
+
+    assertEquals(6, v.size());
+    assertTrue((boolean) v.get(0));
+    assertTrue((boolean) v.get(1));
+    // skip checking CURRENT_DATE since we don't inject dataContext so don't 
know about current timestamp
+    // we can do it from trident test
+    assertEquals(1L, v.get(3));
+    assertEquals(0L, v.get(4));
+    assertEquals(45300000, v.get(5));
+  }
+
+  @Test
+  public void testJDBCNumericFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "{fn POWER(3, 2)}", "{fn ABS(-10)}", "{fn MOD(10, 3)}", 
"{fn MOD(-10, 3)}"
+            ));
+
+    assertEquals(new Values(9.0d, 10, 1, -1), v);
+
+    // Belows are floating numbers so comparing this with literal is tend to 
be failing...
+    // Picking int value and compare
+    Values v2 = testExpr(
+            Lists.newArrayList(
+                    "{fn LOG(16)}", "{fn LOG10(10000)}", "{fn EXP(10)}"
+            ));
+    List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
+      @Nullable
+      @Override
+      public Object apply(@Nullable Object o) {
+        // only takes int value
+        return ((Number) o).intValue();
+      }
+    });
+
+    // 2.7725, 4.0, 22026.465794
+    assertEquals(new Values(2, 4, 22026), v2m);
+  }
+
+  @Test
+  public void testJDBCStringFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "{fn CONCAT('ab', 'cd')}",
+                    "{fn LOCATE('bc', 'abcdeabcde')}",
+                    //"{fn LOCATE('bc', 'abcdeabcde', 4)}",
+                    "{fn INSERT('abcd', 2, 3, 'de')}",
+                    "{fn LCASE('AbCdE')}",
+                    "{fn LENGTH('AbCdE')}",
+                    //"{fn LTRIM('  abcde  ')}",
+                    //"{fn RTRIM('  abcde  ')}",
+                    "{fn SUBSTRING('abcdeabcde', 3, 4)}",
+                    "{fn UCASE('AbCdE')}"
+            )
+    );
+
+    // TODO: Calcite 1.9.0 doesn't support {fn LOCATE(string1, string2 [, 
integer])}
+    // while it's on support list on SQL reference
+    // and bugs on LTRIM and RTRIM : throwing AssertionError: Internal error: 
pre-condition failed: pos != null
+    // commented out problematic function tests
+
+    assertEquals(new Values("abcd", 2, "ade", "abcde", 5, "cdea", "ABCDE"), v);
+  }
+
+  @Test
+  public void testJDBCDateTimeFunctions() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "{fn CURDATE()} = CURRENT_DATE", "{fn CURTIME()} = 
LOCALTIME", "{fn NOW()} = LOCALTIMESTAMP",
+                    "{fn QUARTER(DATE '2016-10-07')}", "{fn 
TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}",
+                    "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 
00:00:00', TIMESTAMP '2016-10-07 00:00:00')}"
+            )
+    );
+
+    assertEquals(new Values(true, true, true, 4L, 1475799300000L, 86400), v);
   }
 
   private Values testExpr(List<String> exprs) throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index 547114f..bd5bde9 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -71,9 +71,9 @@ public class TestPlanCompiler {
 
   @Test
   public void testNested() throws Exception {
-    String sql = "SELECT ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+    String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
             "FROM FOO " +
-            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200";
+            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
     AbstractValuesProcessor proc = compiler.compile(state.tree());
@@ -84,7 +84,7 @@ public class TestPlanCompiler {
     proc.initialize(data, h);
     Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
     Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-    Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
+    Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
index f40e138..f0eed27 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -59,7 +59,7 @@ public class TestRelNodeCompiler {
       // standalone mode doesn't use inputstreams argument
       compiler.visitProject(project, Collections.EMPTY_LIST);
       pw.flush();
-      Assert.assertThat(sw.toString(), containsString("plus("));
+      Assert.assertThat(sw.toString(), containsString(" + 1"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 85510a0..c1ceeba 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -19,8 +19,12 @@
  */
 package org.apache.storm.sql.compiler.backends.trident;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.storm.sql.runtime.calcite.StormDataContext;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.commons.collections4.CollectionUtils;
@@ -40,6 +44,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.ZoneOffset;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +56,7 @@ import static 
org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
 public class TestPlanCompiler {
   private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
           RelDataTypeSystem.DEFAULT);
+  private final DataContext dataContext = new StormDataContext();
 
   @Before
   public void setUp() {
@@ -63,7 +70,7 @@ public class TestPlanCompiler {
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -74,13 +81,37 @@ public class TestPlanCompiler {
   }
 
   @Test
+  public void testSubquery() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    // TODO: add visit method with LogicalValues in PostOrderRelNodeVisitor 
and handle it properly
+    // String sql = "SELECT ID FROM FOO WHERE ID IN (SELECT 2)";
+
+    // TODO: below subquery doesn't work but below join query with subquery as 
table works.
+    // They're showing different logical plan (former is more complicated) so 
there's a room to apply rules to improve.
+    // String sql = "SELECT ID FROM FOO WHERE ID IN (SELECT ID FROM FOO WHERE 
NAME = 'abc')";
+
+    String sql = "SELECT F.ID FROM FOO AS F JOIN (SELECT ID FROM FOO WHERE 
NAME = 'abc') AS F2 ON F.ID = F2.ID";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
+    final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
+            f, new TestUtils.MockStateUpdater(), new Fields());
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(2)}, 
getCollectedValues().toArray());
+  }
+
+  @Test
   public void testCompileGroupByExp() throws Exception {
     final int EXPECTED_VALUE_SIZE = 1;
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource());
     String sql = "SELECT GRPID, COUNT(*) AS CNT, MAX(AGE) AS MAX_AGE, MIN(AGE) 
AS MIN_AGE, AVG(AGE) AS AVG_AGE, MAX(AGE) - MIN(AGE) AS DIFF FROM FOO GROUP BY 
GRPID";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyGroupByTable(sql);
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -98,7 +129,7 @@ public class TestPlanCompiler {
     data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource());
     String sql = "SELECT GRPID, COUNT(*) AS CNT, MAX(SCORE - AGE) AS 
MAX_SCORE_MINUS_AGE FROM FOO GROUP BY GRPID";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyGroupByTable(sql);
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -117,7 +148,7 @@ public class TestPlanCompiler {
     data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
     String sql = "SELECT d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d 
ON e.DEPTID = d.DEPTID WHERE e.EMPID > 0 GROUP BY d.DEPTID";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -136,7 +167,7 @@ public class TestPlanCompiler {
     data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
     String sql = "SELECT d.DEPTID, e.DEPTID FROM DEPT AS d LEFT OUTER JOIN EMP 
AS e ON d.DEPTID = e.DEPTID WHERE e.EMPID is null";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -155,7 +186,7 @@ public class TestPlanCompiler {
     data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
     String sql = "SELECT d.DEPTID, e.DEPTID FROM EMP AS e RIGHT OUTER JOIN 
DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID is null";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -174,7 +205,7 @@ public class TestPlanCompiler {
     data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
     String sql = "SELECT e.DEPTID, d.DEPTNAME FROM EMP AS e FULL OUTER JOIN 
DEPT AS d ON e.DEPTID = d.DEPTID WHERE (d.DEPTNAME is null OR e.EMPNAME is 
null)";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -195,28 +226,11 @@ public class TestPlanCompiler {
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
     data.put("BAR", new TestUtils.MockSqlTridentDataSource());
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
-    Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, 
getCollectedValues().toArray());
-  }
-
-  @Test
-  public void testLogicalExpr() throws Exception {
-    final int EXPECTED_VALUE_SIZE = 1;
-    String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND 
ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    Map<String, ISqlTridentDataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
-    AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
-    final TridentTopology topo = proc.build(data);
-    Fields f = proc.outputStream().getOutputFields();
-    proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
-            f, new TestUtils.MockStateUpdater(), new Fields());
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
-    Assert.assertArrayEquals(new Values[] { new Values(true, false, true) }, 
getCollectedValues().toArray());
+    Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, 
getCollectedValues().toArray());
   }
 
   @Test
@@ -228,7 +242,7 @@ public class TestPlanCompiler {
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
     Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -245,7 +259,7 @@ public class TestPlanCompiler {
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyGroupByTable(sql);
     Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource());
-    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
     AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
@@ -255,6 +269,79 @@ public class TestPlanCompiler {
     Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 15L, 15L) }, 
getCollectedValues().toArray());
   }
 
+  @Test
+  public void testCaseStatement() throws Exception {
+    int EXPECTED_VALUE_SIZE = 5;
+    String sql = "SELECT CASE WHEN NAME IN ('a', 'abc', 'abcde') THEN 
UPPER('a') " +
+            "WHEN UPPER(NAME) = 'AB' THEN 'b' ELSE {fn CONCAT(NAME, '#')} END 
FROM FOO";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
+
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
+    final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, 
new TestUtils.MockStateUpdater(), new Fields());
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[]{new Values("A"), new Values("b"), 
new Values("A"), new Values("abcd#"), new Values("A")}, 
getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testNested() throws Exception {
+    int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+            "FROM FOO " +
+            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
+
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
+    final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, 
new TestUtils.MockStateUpdater(), new Fields());
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+    Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, 
Arrays.asList(100, 200, 300))}, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testDateKeywords() throws Exception {
+    int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT " +
+            "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, 
CURRENT_DATE " +
+            "FROM FOO " +
+            "WHERE ID > 0 AND ID < 2";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
+
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext);
+    final AbstractTridentProcessor proc = 
compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, 
new TestUtils.MockStateUpdater(), new Fields());
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    long utcTimestamp = (long) 
dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName);
+    long currentTimestamp = (long) 
dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName);
+    long localTimestamp = (long) 
dataContext.get(DataContext.Variable.LOCAL_TIMESTAMP.camelName);
+
+    System.out.println(getCollectedValues());
+
+    java.sql.Timestamp timestamp = new java.sql.Timestamp(utcTimestamp);
+    int dateInt = (int) 
timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toLocalDate().toEpochDay();
+    int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+    int currentTimeInt = (int) (currentTimestamp % 
DateTimeUtils.MILLIS_PER_DAY);
+
+    Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, 
currentTimeInt, localTimestamp, currentTimestamp, dateInt)}, 
getCollectedValues().toArray());
+  }
+
   private void runTridentTopology(final int expectedValueSize, 
AbstractTridentProcessor proc,
                                   TridentTopology topo) throws Exception {
     final Config conf = new Config();

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml 
b/external/sql/storm-sql-runtime/pom.xml
index d911f9b..b02bac5 100644
--- a/external/sql/storm-sql-runtime/pom.xml
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -56,14 +56,6 @@
                     <artifactId>jsr305</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.codehaus.janino</groupId>
-                    <artifactId>janino</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.janino</groupId>
-                    <artifactId>commons-compiler</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>org.pentaho</groupId>
                     <artifactId>pentaho-aggdesigner-algorithm</artifactId>
                 </exclusion>
@@ -77,11 +69,6 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
-        <!-- janino -->
-        <dependency>
-            <groupId>org.codehaus.janino</groupId>
-            <artifactId>janino</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
new file mode 100644
index 0000000..aa7e435
--- /dev/null
+++ 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+
+import java.io.Serializable;
+
+/**
+ * This is a hack to use Calcite Context.
+ */
+public class StormContext extends Context implements Serializable {
+    public StormContext(DataContext root) {
+        super(root);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
new file mode 100644
index 0000000..4861b43
--- /dev/null
+++ 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.calcite;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Holder;
+
+import java.io.Serializable;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * This is based on SlimDataContext in Calcite, and borrow some from 
DataContextImpl in Calcite.
+ */
+public class StormDataContext implements DataContext, Serializable {
+    private final ImmutableMap<Object, Object> map;
+
+    public StormDataContext() {
+        // Store the time at which the query started executing. The SQL
+        // standard says that functions such as CURRENT_TIMESTAMP return the
+        // same value throughout the query.
+
+        final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis());
+
+        // Give a hook chance to alter the clock.
+        Hook.CURRENT_TIME.run(timeHolder);
+        final long time = timeHolder.get();
+        final TimeZone timeZone = Calendar.getInstance().getTimeZone();
+        final long localOffset = timeZone.getOffset(time);
+        final long currentOffset = localOffset;
+
+        ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
+        builder.put(Variable.UTC_TIMESTAMP.camelName, time)
+                .put(Variable.CURRENT_TIMESTAMP.camelName, time + 
currentOffset)
+                .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
+                .put(Variable.TIME_ZONE.camelName, timeZone);
+        map = builder.build();
+    }
+
+    @Override
+    public SchemaPlus getRootSchema() {
+        return null;
+    }
+
+    @Override
+    public JavaTypeFactory getTypeFactory() {
+        return null;
+    }
+
+    @Override
+    public QueryProvider getQueryProvider() {
+        return null;
+    }
+
+    @Override
+    public Object get(String name) {
+        return map.get(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
index 7afa096..9aa8b76 100644
--- 
a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -18,6 +18,9 @@
  */
 package org.apache.storm.sql.runtime.trident.functions;
 
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.operation.TridentOperationContext;
 import org.apache.storm.trident.tuple.TridentTuple;
@@ -27,25 +30,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.List;
 import java.util.Map;
 
 public class EvaluationFilter extends BaseFilter {
     private static final Logger LOG = 
LoggerFactory.getLogger(EvaluationFilter.class);
 
     private transient ScriptEvaluator evaluator;
+
     private final String expression;
+    private final DataContext dataContext;
+    private final Object[] outputValues;
 
-    public EvaluationFilter(String expression) {
+    public EvaluationFilter(String expression, DataContext dataContext) {
         this.expression = expression;
+        this.dataContext = dataContext;
+        this.outputValues = new Object[1];
     }
 
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
         LOG.info("Expression: {}", expression);
         try {
-            evaluator = new ScriptEvaluator(expression, Boolean.class,
-                    new String[] {"_data"}, new Class[] { List.class });
+            evaluator = new ScriptEvaluator(expression, int.class,
+                    new String[] {"context", "outputValues"},
+                    new Class[] { Context.class, Object[].class });
         } catch (CompileException e) {
             throw new RuntimeException(e);
         }
@@ -54,7 +62,10 @@ public class EvaluationFilter extends BaseFilter {
     @Override
     public boolean isKeep(TridentTuple tuple) {
         try {
-            return (Boolean) evaluator.evaluate(new Object[] 
{tuple.getValues()});
+            Context calciteContext = new StormContext(dataContext);
+            calciteContext.values = tuple.getValues().toArray();
+            evaluator.evaluate(new Object[] {calciteContext, outputValues});
+            return (outputValues[0] != null && (boolean) outputValues[0]);
         } catch (InvocationTargetException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
index b0bbce3..4a48766 100644
--- 
a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ 
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -18,6 +18,9 @@
  */
 package org.apache.storm.sql.runtime.trident.functions;
 
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
 import org.apache.storm.trident.operation.BaseFunction;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.operation.TridentOperationContext;
@@ -29,25 +32,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.List;
 import java.util.Map;
 
 public class EvaluationFunction extends BaseFunction {
     private static final Logger LOG = 
LoggerFactory.getLogger(EvaluationFunction.class);
 
     private transient ScriptEvaluator evaluator;
+
     private final String expression;
+    private final Object[] outputValues;
+    private final DataContext dataContext;
 
-    public EvaluationFunction(String expression) {
+    public EvaluationFunction(String expression, int outputCount, DataContext 
dataContext) {
         this.expression = expression;
+        this.outputValues = new Object[outputCount];
+        this.dataContext = dataContext;
     }
 
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
         LOG.info("Expression: {}", expression);
         try {
-            evaluator = new ScriptEvaluator(expression, Values.class,
-                    new String[] {"_data"}, new Class[] { List.class });
+            evaluator = new ScriptEvaluator(expression, int.class,
+                    new String[] {"context", "outputValues"},
+                    new Class[] { Context.class, Object[].class });
         } catch (CompileException e) {
             throw new RuntimeException(e);
         }
@@ -56,7 +64,11 @@ public class EvaluationFunction extends BaseFunction {
     @Override
     public void execute(TridentTuple tuple, TridentCollector collector) {
         try {
-            collector.emit((Values) evaluator.evaluate(new Object[] 
{tuple.getValues()}));
+            Context calciteContext = new StormContext(dataContext);
+            calciteContext.values = tuple.getValues().toArray();
+            evaluator.evaluate(
+                    new Object[]{calciteContext, outputValues});
+            collector.emit(new Values(outputValues));
         } catch (InvocationTargetException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java 
b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 7648f58..0a3bac6 100644
--- 
a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ 
b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -19,26 +19,22 @@
  */
 package org.apache.storm.sql;
 
-import org.apache.storm.ILocalCluster;
-import org.apache.storm.LocalCluster;
+import org.apache.storm.sql.runtime.ChannelContext;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
 import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.spout.IBatchSpout;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.sql.runtime.ChannelContext;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -264,9 +260,11 @@ public class TestUtils {
       private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
 
       public MockSpout() {
-        for (int i = 0; i < 5; ++i) {
-          RECORDS.add(new Values(i, "x", "y"));
-        }
+        RECORDS.add(new Values(0, "a", "y"));
+        RECORDS.add(new Values(1, "ab", "y"));
+        RECORDS.add(new Values(2, "abc", "y"));
+        RECORDS.add(new Values(3, "abcd", "y"));
+        RECORDS.add(new Values(4, "abcde", "y"));
       }
 
       private boolean emitted = false;
@@ -487,6 +485,71 @@ public class TestUtils {
     }
   }
 
+  public static class MockSqlTridentNestedDataSource implements 
ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockSpout();
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      return new SimpleSqlTridentConsumer(new MockStateFactory(), new 
MockStateUpdater());
+    }
+
+    private static class MockSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("ID", "MAPFIELD", 
"NESTEDMAPFIELD", "ARRAYFIELD");
+
+      public MockSpout() {
+        List<Integer> ints = Arrays.asList(100, 200, 300);
+        for (int i = 0; i < 5; ++i) {
+          Map<String, Integer> map = new HashMap<>();
+          map.put("b", i);
+          map.put("c", i*i);
+          Map<String, Map<String, Integer>> mm = new HashMap<>();
+          mm.put("a", map);
+          RECORDS.add(new Values(i, map, mm, ints));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
   public static class CollectDataChannelHandler implements ChannelHandler {
     private final List<Values> values;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28ba98e..b44a8e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -264,8 +264,9 @@
         <junit.version>4.11</junit.version>
         <metrics-clojure.version>2.5.1</metrics-clojure.version>
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
-        <calcite.version>1.4.0-incubating</calcite.version>
-        <janino.version>2.7.8</janino.version>
+
+        <calcite.version>1.10.0</calcite.version>
+
         <jackson.version>2.6.3</jackson.version>
         <maven-surefire.version>2.18.1</maven-surefire.version>
         <!-- Kafka version used by old storm-kafka spout code -->
@@ -908,12 +909,6 @@
                 <artifactId>calcite-core</artifactId>
                 <version>${calcite.version}</version>
             </dependency>
-            <!-- used by storm-sql-core and storm-sql-runtime -->
-            <dependency>
-                <groupId>org.codehaus.janino</groupId>
-                <artifactId>janino</artifactId>
-                <version>${janino.version}</version>
-            </dependency>
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-databind</artifactId>

Reply via email to