Re: Implementing ARR_AGG
Hi Sonam, I replied directly to your draft PR. Please see me comments there and let me know if that is helpful. On Mon, Dec 7, 2020 at 4:37 AM Sonam Ramchand < sonam.ramch...@venturedive.com> wrote: > Hi Devs, > I have tried to implement the ARR_AGG function for Zetasql dialect by > following the STRING_AGG implementation ( > https://github.com/apache/beam/pull/11895). > Draft PR for ARR_AGG is (https://github.com/apache/beam/pull/13483). When > i try to run the test, > > @Test > public void testArrayAggregation() { > String sql = > "SELECT ARRAY_AGG(x) AS array_agg\n" + > "FROM UNNEST([2, 1, -2, 3, -2, 1, 2]) AS x"; > ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); > BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); > PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, > beamRelNode); Schema schema = Schema.builder().addArrayField("array_field", > FieldType.of(Schema.TypeName.ARRAY)).build(); > PAssert.that(stream) > .containsInAnyOrder(Row.withSchema(schema).addArray(2, 1, -2, 3, > -2, 1, 2).build()); > pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); > } > > I am getting an error, > type mismatch: > aggCall type: > BIGINT NOT NULL ARRAY NOT NULL > inferred type: > ARRAY NOT NULL > java.lang.AssertionError: type mismatch: > aggCall type: > BIGINT NOT NULL ARRAY NOT NULL > inferred type: > ARRAY NOT NULL at org.apache.beam.vendor.calcite.v1_20_0.org > .apache.calcite.util.Litmus$1.fail(Litmus.java:31) > at org.apache.beam.vendor.calcite.v1_20_0.org > .apache.calcite.plan.RelOptUtil.eq(RelOptUtil.java:1958) > at org.apache.beam.vendor.calcite.v1_20_0.org > .apache.calcite.rel.core.Aggregate.typeMatchesInferred(Aggregate.java:434) > at org.apache.beam.vendor.calcite.v1_20_0.org > .apache.calcite.rel.core.Aggregate.(Aggregate.java:159) > at org.apache.beam.vendor.calcite.v1_20_0.org > .apache.calcite.rel.logical.LogicalAggregate.(LogicalAggregate.java:65) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:113) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:50) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:102) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.Collections$2.tryAdvance(Collections.java:4719) > at java.util.Collections$2.forEachRemaining(Collections.java:4727) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:101) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:89) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:55) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:141) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:152) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testArrayAggregation(ZetaSqlDialectSpecTest.java:4071) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266) > at
Implementing ARR_AGG
Hi Devs, I have tried to implement the ARR_AGG function for Zetasql dialect by following the STRING_AGG implementation ( https://github.com/apache/beam/pull/11895). Draft PR for ARR_AGG is (https://github.com/apache/beam/pull/13483). When i try to run the test, @Test public void testArrayAggregation() { String sql = "SELECT ARRAY_AGG(x) AS array_agg\n" + "FROM UNNEST([2, 1, -2, 3, -2, 1, 2]) AS x"; ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); Schema schema = Schema.builder().addArrayField("array_field", FieldType.of(Schema.TypeName.ARRAY)).build(); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(schema).addArray(2, 1, -2, 3, -2, 1, 2).build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } I am getting an error, type mismatch: aggCall type: BIGINT NOT NULL ARRAY NOT NULL inferred type: ARRAY NOT NULL java.lang.AssertionError: type mismatch: aggCall type: BIGINT NOT NULL ARRAY NOT NULL inferred type: ARRAY NOT NULL at org.apache.beam.vendor.calcite.v1_20_0.org .apache.calcite.util.Litmus$1.fail(Litmus.java:31) at org.apache.beam.vendor.calcite.v1_20_0.org .apache.calcite.plan.RelOptUtil.eq(RelOptUtil.java:1958) at org.apache.beam.vendor.calcite.v1_20_0.org .apache.calcite.rel.core.Aggregate.typeMatchesInferred(Aggregate.java:434) at org.apache.beam.vendor.calcite.v1_20_0.org .apache.calcite.rel.core.Aggregate.(Aggregate.java:159) at org.apache.beam.vendor.calcite.v1_20_0.org .apache.calcite.rel.logical.LogicalAggregate.(LogicalAggregate.java:65) at org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:113) at org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:50) at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:102) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Collections$2.tryAdvance(Collections.java:4719) at java.util.Collections$2.forEachRemaining(Collections.java:4727) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:101) at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:89) at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:55) at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:141) at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180) at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168) at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:152) at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testArrayAggregation(ZetaSqlDialectSpecTest.java:4071) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at