Repository: storm Updated Branches: refs/heads/master 502dca194 -> 6162ccdba
http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java index f2ac081..19369b4 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java @@ -17,6 +17,8 @@ */ package org.apache.storm.sql.compiler; +import com.google.common.base.Function; +import org.apache.calcite.avatica.util.ByteString; import org.apache.storm.tuple.Values; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -30,12 +32,17 @@ import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.AbstractValuesProcessor; import org.junit.Test; +import javax.annotation.Nullable; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestExprSemantic { private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( @@ -60,6 +67,77 @@ public class TestExprSemantic { } @Test + public void testDistinctBetweenLikeSimilarIn() throws Exception { + Values v = testExpr( + Lists.newArrayList("TRUE IS DISTINCT FROM TRUE", + "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5", + "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'", + "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO '[a-zA-Z]+[cd]{1}'", + "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN ('1', '2', '3', '4')", + "2 NOT IN (1, 3, 5)")); + assertEquals(new Values(false, false, true, true, true, + false, true, true, true, true), v); + } + + @Test + public void testCaseStatement() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN UPPER('a') " + + "WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn CONCAT('abcd', '#')} END", + "CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN UPPER('a') " + + "WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn CONCAT('ab', '#')} END", + "CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN UPPER('a') " + + "WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn CONCAT('abc', '#')} END" + ) + ); + + // TODO: The data type of literal Calcite assigns seems to be out of expectation. Please see below logical plan. + // LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), =('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 'b', CAST(||('abcd', '#')):VARCHAR(5) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], EXPR$1=[CASE(OR(=('ab', 'a'), =('ab', 'abc'), =('ab', 'abcde')), CAST(UPPER('a')):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('ab'), 'AB'), CAST('b'):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, ||('ab', '#'))], EXPR$2=[CASE(OR(=('abc', 'a'), =('abc', 'abc'), =('abc', 'abcde')), CAST(UPPER('a')):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abc'), CAST('AB'):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), CAST('b'):CHAR(4) C HARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, ||('abc', '#'))]): rowcount = 1.0, cumulative cost = {2.0 rows, 5.0 cpu, 0.0 io}, id = 5 + // LogicalFilter(condition=[AND(>($0, 0), <($0, 2))]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 4 + // EnumerableTableScan(table=[[FOO]]): rowcount = 1.0, cumulative cost = {0.0 rows, 1.0 cpu, 0.0 io}, id = 3 + // in result, both 'b' and UPPER('a') hence 'A' are having some spaces which is not expected. + // When we use CASE with actual column (Java String type hence VARCHAR), it seems to work as expected. + // Please refer trident/TestPlanCompiler#testCaseStatement(), and see below logical plan. + // LogicalProject(EXPR$0=[CASE(OR(=($1, 'a'), =($1, 'abc'), =($1, 'abcde')), CAST(UPPER('a')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", =(CAST(UPPER($1)):VARCHAR(2) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", 'AB'), 'b', CAST(||($1, '#')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary")]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 3 + List<Object> v2 = Lists.transform(v, new Function<Object, Object>() { + @Nullable + @Override + public String apply(@Nullable Object o) { + return ((String) o).trim(); + } + }); + assertArrayEquals(new Values("abcd#", "b", "A").toArray(), v2.toArray()); + } + + @Test + public void testNullIfAndCoalesce() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "NULLIF(5, 5)", "NULLIF(5, 0)", "COALESCE(NULL, NULL, 5, 4, NULL)", "COALESCE(1, 5)" + )); + assertEquals(new Values(null, 5, 5, 1), v); + } + + @Test + public void testCollectionFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "ELEMENT(ARRAY[3])", "CARDINALITY(ARRAY[1, 2, 3, 4, 5])" + )); + assertEquals(new Values(3, 5), v); + } + + @Test(expected = RuntimeException.class) + public void testElementFunctionMoreThanOneValue() throws Exception { + testExpr( + Lists.newArrayList( + "ELEMENT(ARRAY[1, 2, 3])" + )); + fail("ELEMENT with array which has multiple elements should throw exception in runtime."); + } + + @Test public void testArithmeticWithNull() throws Exception { Values v = testExpr( Lists.newArrayList( @@ -131,14 +209,188 @@ public class TestExprSemantic { } @Test - public void testStringMethods() throws Exception { + public void testArithmeticFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "POWER(3, 2)", "ABS(-10)", "MOD(10, 3)", "MOD(-10, 3)", + "CEIL(123.45)", "FLOOR(123.45)" + )); + + assertEquals(new Values(9.0d, 10, 1, -1, new BigDecimal(124), new BigDecimal(123)), v); + + // Belows are floating numbers so comparing this with literal is tend to be failing... + // Picking int value and compare + Values v2 = testExpr( + Lists.newArrayList( + "SQRT(255)", "LN(16)", "LOG10(10000)", "EXP(10)" + )); + List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() { + @Nullable + @Override + public Object apply(@Nullable Object o) { + // only takes int value + return ((Number) o).intValue(); + } + }); + + // 15.9687, 2.7725, 4.0, 22026.465794 + assertEquals(new Values(15, 2, 4, 22026), v2m); + } + + @Test + public void testStringFunctions() throws Exception { Values v = testExpr( Lists.newArrayList( - "UPPER('a')", "LOWER('A')", "INITCAP('foo')", - "SUBSTRING('foo', 2)", "CHARACTER_LENGTH('foo')", "CHAR_LENGTH('foo')", - "'ab' || 'cd'" + "'ab' || 'cd'", "CHAR_LENGTH('foo')", "CHARACTER_LENGTH('foo')", + "UPPER('a')", "LOWER('A')", "POSITION('bc' IN 'abcd')", + "TRIM(BOTH ' ' FROM ' abcdeabcdeabc ')", + "TRIM(LEADING ' ' FROM ' abcdeabcdeabc ')", + "TRIM(TRAILING ' ' FROM ' abcdeabcdeabc ')", + "OVERLAY('abcde' PLACING 'bc' FROM 3)", + "SUBSTRING('abcde' FROM 3)", "SUBSTRING('abcdeabcde' FROM 3 FOR 4)", + "INITCAP('foo')" )); - assertEquals(new Values("A", "a", "Foo", "oo", 3, 3, "abcd"), v); + assertEquals(new Values("abcd", 3, 3, "A", "a", 2, "abcdeabcdeabc", "abcdeabcdeabc ", " abcdeabcdeabc", "abbce", "cde", "cdea", "Foo"), v); + } + + @Test + public void testBinaryStringFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "x'45F0AB' || x'45F0AB'", + "POSITION(x'F0' IN x'453423F0ABBC')", + "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3)" + // "SUBSTRING(x'453423F0ABBC' FROM 3)", + // "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4)" + )); + + // TODO: Calcite 1.9.0 has bugs on binary SUBSTRING functions + // as there's no SqlFunctions.substring(org.apache.calcite.avatica.util.ByteString, ...) + // commented out testing substring function + + assertEquals("45f0ab45f0ab", v.get(0).toString()); + assertEquals(4, v.get(1)); + assertEquals("45344534abbc45", v.get(2).toString()); + // assertEquals("23f0abbc", v.get(3).toString()); + // assertEquals("23f0ab", v.get(4).toString()); + } + + @Test + public void testDateAndTimestampLiteral() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "DATE '1970-05-15' AS datefield", + "TIME '00:00:00' AS timefield", + "TIMESTAMP '2016-01-01 00:00:00' as timestampfield" + ) + ); + + assertEquals(3, v.size()); + assertEquals(134, v.get(0)); + assertEquals(0, v.get(1)); + assertEquals(1451606400000L, v.get(2)); + } + + @Test + public void testInterval() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "INTERVAL '1-5' YEAR TO MONTH AS intervalfield", + "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field" + ) + ); + + assertEquals(3, v.size()); + assertEquals(17, v.get(0)); + assertEquals(0, v.get(1)); + assertEquals(14, v.get(2)); + } + + @Test + public void testDateFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "LOCALTIME = CURRENT_TIME, LOCALTIMESTAMP = CURRENT_TIMESTAMP, CURRENT_DATE", + "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')", + "FLOOR(DATE '2016-01-23' TO MONTH)", + "CEIL(TIME '12:34:56' TO MINUTE)" + ) + ); + + assertEquals(6, v.size()); + assertTrue((boolean) v.get(0)); + assertTrue((boolean) v.get(1)); + // skip checking CURRENT_DATE since we don't inject dataContext so don't know about current timestamp + // we can do it from trident test + assertEquals(1L, v.get(3)); + assertEquals(0L, v.get(4)); + assertEquals(45300000, v.get(5)); + } + + @Test + public void testJDBCNumericFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "{fn POWER(3, 2)}", "{fn ABS(-10)}", "{fn MOD(10, 3)}", "{fn MOD(-10, 3)}" + )); + + assertEquals(new Values(9.0d, 10, 1, -1), v); + + // Belows are floating numbers so comparing this with literal is tend to be failing... + // Picking int value and compare + Values v2 = testExpr( + Lists.newArrayList( + "{fn LOG(16)}", "{fn LOG10(10000)}", "{fn EXP(10)}" + )); + List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() { + @Nullable + @Override + public Object apply(@Nullable Object o) { + // only takes int value + return ((Number) o).intValue(); + } + }); + + // 2.7725, 4.0, 22026.465794 + assertEquals(new Values(2, 4, 22026), v2m); + } + + @Test + public void testJDBCStringFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "{fn CONCAT('ab', 'cd')}", + "{fn LOCATE('bc', 'abcdeabcde')}", + //"{fn LOCATE('bc', 'abcdeabcde', 4)}", + "{fn INSERT('abcd', 2, 3, 'de')}", + "{fn LCASE('AbCdE')}", + "{fn LENGTH('AbCdE')}", + //"{fn LTRIM(' abcde ')}", + //"{fn RTRIM(' abcde ')}", + "{fn SUBSTRING('abcdeabcde', 3, 4)}", + "{fn UCASE('AbCdE')}" + ) + ); + + // TODO: Calcite 1.9.0 doesn't support {fn LOCATE(string1, string2 [, integer])} + // while it's on support list on SQL reference + // and bugs on LTRIM and RTRIM : throwing AssertionError: Internal error: pre-condition failed: pos != null + // commented out problematic function tests + + assertEquals(new Values("abcd", 2, "ade", "abcde", 5, "cdea", "ABCDE"), v); + } + + @Test + public void testJDBCDateTimeFunctions() throws Exception { + Values v = testExpr( + Lists.newArrayList( + "{fn CURDATE()} = CURRENT_DATE", "{fn CURTIME()} = LOCALTIME", "{fn NOW()} = LOCALTIMESTAMP", + "{fn QUARTER(DATE '2016-10-07')}", "{fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}", + "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}" + ) + ); + + assertEquals(new Values(true, true, true, 4L, 1475799300000L, 86400), v); } private Values testExpr(List<String> exprs) throws Exception { http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java index 547114f..bd5bde9 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java @@ -71,9 +71,9 @@ public class TestPlanCompiler { @Test public void testNested() throws Exception { - String sql = "SELECT ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " + + String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " + "FROM FOO " + - "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200"; + "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql); PlanCompiler compiler = new PlanCompiler(typeFactory); AbstractValuesProcessor proc = compiler.compile(state.tree()); @@ -84,7 +84,7 @@ public class TestPlanCompiler { proc.initialize(data, h); Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4); Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); - Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0)); + Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0)); } @Test http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java index f40e138..f0eed27 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java @@ -59,7 +59,7 @@ public class TestRelNodeCompiler { // standalone mode doesn't use inputstreams argument compiler.visitProject(project, Collections.EMPTY_LIST); pw.flush(); - Assert.assertThat(sw.toString(), containsString("plus(")); + Assert.assertThat(sw.toString(), containsString(" + 1")); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java index 85510a0..c1ceeba 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java @@ -19,8 +19,12 @@ */ package org.apache.storm.sql.compiler.backends.trident; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.util.DateTimeUtils; +import org.apache.storm.sql.runtime.calcite.StormDataContext; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.commons.collections4.CollectionUtils; @@ -40,6 +44,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.time.ZoneOffset; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,6 +56,7 @@ import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues; public class TestPlanCompiler { private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT); + private final DataContext dataContext = new StormDataContext(); @Before public void setUp() { @@ -63,7 +70,7 @@ public class TestPlanCompiler { TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); final Map<String, ISqlTridentDataSource> data = new HashMap<>(); data.put("FOO", new TestUtils.MockSqlTridentDataSource()); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -74,13 +81,37 @@ public class TestPlanCompiler { } @Test + public void testSubquery() throws Exception { + final int EXPECTED_VALUE_SIZE = 1; + // TODO: add visit method with LogicalValues in PostOrderRelNodeVisitor and handle it properly + // String sql = "SELECT ID FROM FOO WHERE ID IN (SELECT 2)"; + + // TODO: below subquery doesn't work but below join query with subquery as table works. + // They're showing different logical plan (former is more complicated) so there's a room to apply rules to improve. + // String sql = "SELECT ID FROM FOO WHERE ID IN (SELECT ID FROM FOO WHERE NAME = 'abc')"; + + String sql = "SELECT F.ID FROM FOO AS F JOIN (SELECT ID FROM FOO WHERE NAME = 'abc') AS F2 ON F.ID = F2.ID"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + final Map<String, ISqlTridentDataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + Assert.assertArrayEquals(new Values[] { new Values(2)}, getCollectedValues().toArray()); + } + + @Test public void testCompileGroupByExp() throws Exception { final int EXPECTED_VALUE_SIZE = 1; final Map<String, ISqlTridentDataSource> data = new HashMap<>(); data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource()); String sql = "SELECT GRPID, COUNT(*) AS CNT, MAX(AGE) AS MAX_AGE, MIN(AGE) AS MIN_AGE, AVG(AGE) AS AVG_AGE, MAX(AGE) - MIN(AGE) AS DIFF FROM FOO GROUP BY GRPID"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyGroupByTable(sql); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -98,7 +129,7 @@ public class TestPlanCompiler { data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource()); String sql = "SELECT GRPID, COUNT(*) AS CNT, MAX(SCORE - AGE) AS MAX_SCORE_MINUS_AGE FROM FOO GROUP BY GRPID"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyGroupByTable(sql); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -117,7 +148,7 @@ public class TestPlanCompiler { data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept()); String sql = "SELECT d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID > 0 GROUP BY d.DEPTID"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -136,7 +167,7 @@ public class TestPlanCompiler { data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept()); String sql = "SELECT d.DEPTID, e.DEPTID FROM DEPT AS d LEFT OUTER JOIN EMP AS e ON d.DEPTID = e.DEPTID WHERE e.EMPID is null"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -155,7 +186,7 @@ public class TestPlanCompiler { data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept()); String sql = "SELECT d.DEPTID, e.DEPTID FROM EMP AS e RIGHT OUTER JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID is null"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -174,7 +205,7 @@ public class TestPlanCompiler { data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept()); String sql = "SELECT e.DEPTID, d.DEPTNAME FROM EMP AS e FULL OUTER JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE (d.DEPTNAME is null OR e.EMPNAME is null)"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -195,28 +226,11 @@ public class TestPlanCompiler { final Map<String, ISqlTridentDataSource> data = new HashMap<>(); data.put("FOO", new TestUtils.MockSqlTridentDataSource()); data.put("BAR", new TestUtils.MockSqlTridentDataSource()); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); - Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, getCollectedValues().toArray()); - } - - @Test - public void testLogicalExpr() throws Exception { - final int EXPECTED_VALUE_SIZE = 1; - String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2"; - TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); - Map<String, ISqlTridentDataSource> data = new HashMap<>(); - data.put("FOO", new TestUtils.MockSqlTridentDataSource()); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); - AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); - final TridentTopology topo = proc.build(data); - Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), - f, new TestUtils.MockStateUpdater(), new Fields()); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); - Assert.assertArrayEquals(new Values[] { new Values(true, false, true) }, getCollectedValues().toArray()); + Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, getCollectedValues().toArray()); } @Test @@ -228,7 +242,7 @@ public class TestPlanCompiler { TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); Map<String, ISqlTridentDataSource> data = new HashMap<>(); data.put("FOO", new TestUtils.MockSqlTridentDataSource()); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -245,7 +259,7 @@ public class TestPlanCompiler { TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyGroupByTable(sql); Map<String, ISqlTridentDataSource> data = new HashMap<>(); data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource()); - PlanCompiler compiler = new PlanCompiler(data, typeFactory); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); @@ -255,6 +269,79 @@ public class TestPlanCompiler { Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 15L, 15L) }, getCollectedValues().toArray()); } + @Test + public void testCaseStatement() throws Exception { + int EXPECTED_VALUE_SIZE = 5; + String sql = "SELECT CASE WHEN NAME IN ('a', 'abc', 'abcde') THEN UPPER('a') " + + "WHEN UPPER(NAME) = 'AB' THEN 'b' ELSE {fn CONCAT(NAME, '#')} END FROM FOO"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + + final Map<String, ISqlTridentDataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + Assert.assertArrayEquals(new Values[]{new Values("A"), new Values("b"), new Values("A"), new Values("abcd#"), new Values("A")}, getCollectedValues().toArray()); + } + + @Test + public void testNested() throws Exception { + int EXPECTED_VALUE_SIZE = 1; + String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " + + "FROM FOO " + + "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql); + + final Map<String, ISqlTridentDataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4); + Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); + Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))}, getCollectedValues().toArray()); + } + + @Test + public void testDateKeywords() throws Exception { + int EXPECTED_VALUE_SIZE = 1; + String sql = "SELECT " + + "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE " + + "FROM FOO " + + "WHERE ID > 0 AND ID < 2"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + + final Map<String, ISqlTridentDataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory, dataContext); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + long utcTimestamp = (long) dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName); + long currentTimestamp = (long) dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName); + long localTimestamp = (long) dataContext.get(DataContext.Variable.LOCAL_TIMESTAMP.camelName); + + System.out.println(getCollectedValues()); + + java.sql.Timestamp timestamp = new java.sql.Timestamp(utcTimestamp); + int dateInt = (int) timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toLocalDate().toEpochDay(); + int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY); + int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY); + + Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt)}, getCollectedValues().toArray()); + } + private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc, TridentTopology topo) throws Exception { final Config conf = new Config(); http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml index d911f9b..b02bac5 100644 --- a/external/sql/storm-sql-runtime/pom.xml +++ b/external/sql/storm-sql-runtime/pom.xml @@ -56,14 +56,6 @@ <artifactId>jsr305</artifactId> </exclusion> <exclusion> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.janino</groupId> - <artifactId>commons-compiler</artifactId> - </exclusion> - <exclusion> <groupId>org.pentaho</groupId> <artifactId>pentaho-aggdesigner-algorithm</artifactId> </exclusion> @@ -77,11 +69,6 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> - <!-- janino --> - <dependency> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java new file mode 100644 index 0000000..aa7e435 --- /dev/null +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.interpreter; + +import org.apache.calcite.DataContext; + +import java.io.Serializable; + +/** + * This is a hack to use Calcite Context. + */ +public class StormContext extends Context implements Serializable { + public StormContext(DataContext root) { + super(root); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java new file mode 100644 index 0000000..4861b43 --- /dev/null +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.runtime.calcite; + +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.util.Holder; + +import java.io.Serializable; +import java.util.Calendar; +import java.util.TimeZone; + +/** + * This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite. + */ +public class StormDataContext implements DataContext, Serializable { + private final ImmutableMap<Object, Object> map; + + public StormDataContext() { + // Store the time at which the query started executing. The SQL + // standard says that functions such as CURRENT_TIMESTAMP return the + // same value throughout the query. + + final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis()); + + // Give a hook chance to alter the clock. + Hook.CURRENT_TIME.run(timeHolder); + final long time = timeHolder.get(); + final TimeZone timeZone = Calendar.getInstance().getTimeZone(); + final long localOffset = timeZone.getOffset(time); + final long currentOffset = localOffset; + + ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder(); + builder.put(Variable.UTC_TIMESTAMP.camelName, time) + .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset) + .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset) + .put(Variable.TIME_ZONE.camelName, timeZone); + map = builder.build(); + } + + @Override + public SchemaPlus getRootSchema() { + return null; + } + + @Override + public JavaTypeFactory getTypeFactory() { + return null; + } + + @Override + public QueryProvider getQueryProvider() { + return null; + } + + @Override + public Object get(String name) { + return map.get(name); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java index 7afa096..9aa8b76 100644 --- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java @@ -18,6 +18,9 @@ */ package org.apache.storm.sql.runtime.trident.functions; +import org.apache.calcite.DataContext; +import org.apache.calcite.interpreter.Context; +import org.apache.calcite.interpreter.StormContext; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.tuple.TridentTuple; @@ -27,25 +30,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; -import java.util.List; import java.util.Map; public class EvaluationFilter extends BaseFilter { private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class); private transient ScriptEvaluator evaluator; + private final String expression; + private final DataContext dataContext; + private final Object[] outputValues; - public EvaluationFilter(String expression) { + public EvaluationFilter(String expression, DataContext dataContext) { this.expression = expression; + this.dataContext = dataContext; + this.outputValues = new Object[1]; } @Override public void prepare(Map conf, TridentOperationContext context) { LOG.info("Expression: {}", expression); try { - evaluator = new ScriptEvaluator(expression, Boolean.class, - new String[] {"_data"}, new Class[] { List.class }); + evaluator = new ScriptEvaluator(expression, int.class, + new String[] {"context", "outputValues"}, + new Class[] { Context.class, Object[].class }); } catch (CompileException e) { throw new RuntimeException(e); } @@ -54,7 +62,10 @@ public class EvaluationFilter extends BaseFilter { @Override public boolean isKeep(TridentTuple tuple) { try { - return (Boolean) evaluator.evaluate(new Object[] {tuple.getValues()}); + Context calciteContext = new StormContext(dataContext); + calciteContext.values = tuple.getValues().toArray(); + evaluator.evaluate(new Object[] {calciteContext, outputValues}); + return (outputValues[0] != null && (boolean) outputValues[0]); } catch (InvocationTargetException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java index b0bbce3..4a48766 100644 --- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java @@ -18,6 +18,9 @@ */ package org.apache.storm.sql.runtime.trident.functions; +import org.apache.calcite.DataContext; +import org.apache.calcite.interpreter.Context; +import org.apache.calcite.interpreter.StormContext; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; @@ -29,25 +32,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; -import java.util.List; import java.util.Map; public class EvaluationFunction extends BaseFunction { private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class); private transient ScriptEvaluator evaluator; + private final String expression; + private final Object[] outputValues; + private final DataContext dataContext; - public EvaluationFunction(String expression) { + public EvaluationFunction(String expression, int outputCount, DataContext dataContext) { this.expression = expression; + this.outputValues = new Object[outputCount]; + this.dataContext = dataContext; } @Override public void prepare(Map conf, TridentOperationContext context) { LOG.info("Expression: {}", expression); try { - evaluator = new ScriptEvaluator(expression, Values.class, - new String[] {"_data"}, new Class[] { List.class }); + evaluator = new ScriptEvaluator(expression, int.class, + new String[] {"context", "outputValues"}, + new Class[] { Context.class, Object[].class }); } catch (CompileException e) { throw new RuntimeException(e); } @@ -56,7 +64,11 @@ public class EvaluationFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { try { - collector.emit((Values) evaluator.evaluate(new Object[] {tuple.getValues()})); + Context calciteContext = new StormContext(dataContext); + calciteContext.values = tuple.getValues().toArray(); + evaluator.evaluate( + new Object[]{calciteContext, outputValues}); + collector.emit(new Values(outputValues)); } catch (InvocationTargetException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java index 7648f58..0a3bac6 100644 --- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java +++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java @@ -19,26 +19,22 @@ */ package org.apache.storm.sql; -import org.apache.storm.ILocalCluster; -import org.apache.storm.LocalCluster; +import org.apache.storm.sql.runtime.ChannelContext; +import org.apache.storm.sql.runtime.ChannelHandler; +import org.apache.storm.sql.runtime.DataSource; +import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer; import org.apache.storm.task.IMetricsContext; import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.spout.IBatchSpout; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; import org.apache.storm.trident.state.StateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import org.apache.storm.sql.runtime.ChannelContext; -import org.apache.storm.sql.runtime.ChannelHandler; -import org.apache.storm.sql.runtime.DataSource; -import org.apache.storm.sql.runtime.ISqlTridentDataSource; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.Function; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; -import org.apache.storm.trident.tuple.TridentTuple; import java.util.ArrayList; import java.util.Arrays; @@ -264,9 +260,11 @@ public class TestUtils { private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR"); public MockSpout() { - for (int i = 0; i < 5; ++i) { - RECORDS.add(new Values(i, "x", "y")); - } + RECORDS.add(new Values(0, "a", "y")); + RECORDS.add(new Values(1, "ab", "y")); + RECORDS.add(new Values(2, "abc", "y")); + RECORDS.add(new Values(3, "abcd", "y")); + RECORDS.add(new Values(4, "abcde", "y")); } private boolean emitted = false; @@ -487,6 +485,71 @@ public class TestUtils { } } + public static class MockSqlTridentNestedDataSource implements ISqlTridentDataSource { + @Override + public IBatchSpout getProducer() { + return new MockSpout(); + } + + @Override + public SqlTridentConsumer getConsumer() { + return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater()); + } + + private static class MockSpout implements IBatchSpout { + private final ArrayList<Values> RECORDS = new ArrayList<>(); + private final Fields OUTPUT_FIELDS = new Fields("ID", "MAPFIELD", "NESTEDMAPFIELD", "ARRAYFIELD"); + + public MockSpout() { + List<Integer> ints = Arrays.asList(100, 200, 300); + for (int i = 0; i < 5; ++i) { + Map<String, Integer> map = new HashMap<>(); + map.put("b", i); + map.put("c", i*i); + Map<String, Map<String, Integer>> mm = new HashMap<>(); + mm.put("a", map); + RECORDS.add(new Values(i, map, mm, ints)); + } + } + + private boolean emitted = false; + + @Override + public void open(Map conf, TopologyContext context) { + } + + @Override + public void emitBatch(long batchId, TridentCollector collector) { + if (emitted) { + return; + } + + for (Values r : RECORDS) { + collector.emit(r); + } + emitted = true; + } + + @Override + public void ack(long batchId) { + } + + @Override + public void close() { + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + return OUTPUT_FIELDS; + } + } + } + public static class CollectDataChannelHandler implements ChannelHandler { private final List<Values> values; http://git-wip-us.apache.org/repos/asf/storm/blob/996e6afb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 28ba98e..b44a8e7 100644 --- a/pom.xml +++ b/pom.xml @@ -264,8 +264,9 @@ <junit.version>4.11</junit.version> <metrics-clojure.version>2.5.1</metrics-clojure.version> <hdrhistogram.version>2.1.7</hdrhistogram.version> - <calcite.version>1.4.0-incubating</calcite.version> - <janino.version>2.7.8</janino.version> + + <calcite.version>1.10.0</calcite.version> + <jackson.version>2.6.3</jackson.version> <maven-surefire.version>2.18.1</maven-surefire.version> <!-- Kafka version used by old storm-kafka spout code --> @@ -908,12 +909,6 @@ <artifactId>calcite-core</artifactId> <version>${calcite.version}</version> </dependency> - <!-- used by storm-sql-core and storm-sql-runtime --> - <dependency> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - <version>${janino.version}</version> - </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId>