This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d41dd50 [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests new 18ce248 Merge pull request #12079 from Imfuyuwei/master d41dd50 is described below commit d41dd5018f90bcac90079f68624710c811f05e50 Author: Yuwei Fu <fuyu...@google.com> AuthorDate: Wed Jun 24 06:18:45 2020 +0000 [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests --- .../impl/transform/BeamBuiltinAggregations.java | 35 ++++++++++++++++++++++ .../extensions/sql/BeamSqlDslAggregationTest.java | 32 ++++++++++++++++++++ .../sql/zetasql/SqlStdOperatorMappingTable.java | 3 +- .../sql/zetasql/ZetaSQLDialectSpecTest.java | 17 +++++++++++ 4 files changed, 86 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java index 347fdc12..ab3786b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java @@ -58,6 +58,7 @@ public class BeamBuiltinAggregations { .put("$SUM0", BeamBuiltinAggregations::createSum) .put("AVG", BeamBuiltinAggregations::createAvg) .put("BIT_OR", BeamBuiltinAggregations::createBitOr) + .put("BIT_AND", BeamBuiltinAggregations::createBitAnd) .put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName())) .put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName())) .put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName())) @@ -185,6 +186,14 @@ public class BeamBuiltinAggregations { String.format("[%s] is not supported in BIT_OR", fieldType)); } + static CombineFn createBitAnd(Schema.FieldType fieldType) { + if (fieldType.getTypeName() == TypeName.INT64) { + return new BitAnd(); + } + throw new UnsupportedOperationException( + String.format("[%s] is not supported in BIT_AND", fieldType)); + } + static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> { @Override public T apply(T left, T right) { @@ -383,4 +392,30 @@ public class BeamBuiltinAggregations { return accum; } } + + static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> { + @Override + public Long createAccumulator() { + return -1L; + } + + @Override + public Long addInput(Long accum, T input) { + return accum & input.longValue(); + } + + @Override + public Long mergeAccumulators(Iterable<Long> accums) { + Long merged = createAccumulator(); + for (long accum : accums) { + merged = merged & accum; + } + return merged; + } + + @Override + public Long extractOutput(Long accum) { + return accum; + } + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 40b3b63..9c365b2 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -314,7 +314,39 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<Row> inputRows = pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA)); PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); + + PAssert.that(result).containsInAnyOrder(rowResult); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testBitAndFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + + Schema schemaInTableA = + Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build(); + + Schema resultType = Schema.builder().addInt64Field("finalAnswer").build(); + + List<Row> rowsInTableA = + TestUtils.RowsBuilder.of(schemaInTableA) + .addRows( + 0xF001L, 0, + 0x00A1L, 0) + .getRows(); + + String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2"; + + Row rowResult = Row.withSchema(resultType).addValues(1L).build(); + + PCollection<Row> inputRows = + pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA)); + PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); + PAssert.that(result).containsInAnyOrder(rowResult); + + pipeline.run().waitUntilFinish(); } private static class CheckerBigDecimalDivide diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java index b89e053..4675f62 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java @@ -35,6 +35,7 @@ public class SqlStdOperatorMappingTable { FunctionSignatureId.FN_ANY_VALUE, FunctionSignatureId.FN_STRING_AGG_STRING, FunctionSignatureId.FN_BIT_OR_INT64, + FunctionSignatureId.FN_BIT_AND_INT64, FunctionSignatureId.FN_OR, FunctionSignatureId.FN_NOT, FunctionSignatureId.FN_MULTIPLY_DOUBLE, @@ -226,7 +227,7 @@ public class SqlStdOperatorMappingTable { // .put("array_agg", ) // .put("array_concat_agg") .put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported - // .put("bit_and") + .put("bit_and", SqlStdOperatorTable.BIT_AND) // .put("bit_xor") // .put("logical_and") // .put("logical_or") diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java index 82750eb..84f3eb9 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java @@ -4613,6 +4613,23 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase { } @Test + public void testZetaSQLBitAnd() { + String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + final Schema schema = Schema.builder().addInt64Field("field1").build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema).addValue(1L).build(), + Row.withSchema(schema).addValue(0L).build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test public void testSimpleTableName() { String sql = "SELECT Key FROM KeyValue";