Re: Implementing ARR_AGG

2020-12-08 Thread Robin Qiu
Hi Sonam, I replied directly to your draft PR. Please see me comments there
and let me know if that is helpful.

On Mon, Dec 7, 2020 at 4:37 AM Sonam Ramchand <
sonam.ramch...@venturedive.com> wrote:

> Hi Devs,
> I have tried to implement the ARR_AGG function for Zetasql dialect by
> following the STRING_AGG implementation (
> https://github.com/apache/beam/pull/11895).
> Draft PR for ARR_AGG is (https://github.com/apache/beam/pull/13483). When
> i try to run the test,
>
> @Test
> public void testArrayAggregation() {
>   String sql =
>   "SELECT ARRAY_AGG(x) AS array_agg\n" +
>   "FROM UNNEST([2, 1, -2, 3, -2, 1, 2]) AS x";  
> ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
>   BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
>   PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
> beamRelNode);  Schema schema = Schema.builder().addArrayField("array_field", 
> FieldType.of(Schema.TypeName.ARRAY)).build();
>   PAssert.that(stream)
>   .containsInAnyOrder(Row.withSchema(schema).addArray(2, 1, -2, 3, 
> -2, 1, 2).build());  
> pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
> }
>
> I am getting an error,
> type mismatch:
> aggCall type:
> BIGINT NOT NULL ARRAY NOT NULL
> inferred type:
> ARRAY NOT NULL
> java.lang.AssertionError: type mismatch:
> aggCall type:
> BIGINT NOT NULL ARRAY NOT NULL
> inferred type:
> ARRAY NOT NULL at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.util.Litmus$1.fail(Litmus.java:31)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.plan.RelOptUtil.eq(RelOptUtil.java:1958)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.rel.core.Aggregate.typeMatchesInferred(Aggregate.java:434)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.rel.core.Aggregate.(Aggregate.java:159)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.rel.logical.LogicalAggregate.(LogicalAggregate.java:65)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:113)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:50)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:102)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Collections$2.tryAdvance(Collections.java:4719)
> at java.util.Collections$2.forEachRemaining(Collections.java:4727)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:101)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:89)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:55)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:141)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:152)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testArrayAggregation(ZetaSqlDialectSpecTest.java:4071)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266)
> at 

Implementing ARR_AGG

2020-12-07 Thread Sonam Ramchand
Hi Devs,
I have tried to implement the ARR_AGG function for Zetasql dialect by
following the STRING_AGG implementation (
https://github.com/apache/beam/pull/11895).
Draft PR for ARR_AGG is (https://github.com/apache/beam/pull/13483). When i
try to run the test,

@Test
public void testArrayAggregation() {
  String sql =
  "SELECT ARRAY_AGG(x) AS array_agg\n" +
  "FROM UNNEST([2, 1, -2, 3, -2, 1, 2]) AS x";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new
ZetaSQLQueryPlanner(config);
  BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
  PCollection stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);  Schema schema =
Schema.builder().addArrayField("array_field",
FieldType.of(Schema.TypeName.ARRAY)).build();
  PAssert.that(stream)
  .containsInAnyOrder(Row.withSchema(schema).addArray(2, 1,
-2, 3, -2, 1, 2).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

I am getting an error,
type mismatch:
aggCall type:
BIGINT NOT NULL ARRAY NOT NULL
inferred type:
ARRAY NOT NULL
java.lang.AssertionError: type mismatch:
aggCall type:
BIGINT NOT NULL ARRAY NOT NULL
inferred type:
ARRAY NOT NULL at org.apache.beam.vendor.calcite.v1_20_0.org
.apache.calcite.util.Litmus$1.fail(Litmus.java:31)
at org.apache.beam.vendor.calcite.v1_20_0.org
.apache.calcite.plan.RelOptUtil.eq(RelOptUtil.java:1958)
at org.apache.beam.vendor.calcite.v1_20_0.org
.apache.calcite.rel.core.Aggregate.typeMatchesInferred(Aggregate.java:434)
at org.apache.beam.vendor.calcite.v1_20_0.org
.apache.calcite.rel.core.Aggregate.(Aggregate.java:159)
at org.apache.beam.vendor.calcite.v1_20_0.org
.apache.calcite.rel.logical.LogicalAggregate.(LogicalAggregate.java:65)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:113)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:50)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:102)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Collections$2.tryAdvance(Collections.java:4719)
at java.util.Collections$2.forEachRemaining(Collections.java:4727)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:101)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:89)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:55)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:141)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:152)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testArrayAggregation(ZetaSqlDialectSpecTest.java:4071)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322)
at
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at