Hi Kerry, Casting sample to long works. Is there a way to handle int fields as is in a row in pcollection?
Regards, Ana > > On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <[email protected]> wrote: > > > I believe the issue is that your sample data is cast to Integer, while the > schema expects a Long. You can explicitly cast your samples to Long. If you > search for that you should find some good examples. > >> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <[email protected]> wrote: >> >> Hi Team, >> >> I am trying to write test cases for Zeta SQL transform. I just tried junit >> for a very simple pcollection with int64 values. I am getting the below >> error, Am I missing anything here? I am using Junit4 and beam version 2.35. >> Please let me know if any other details are needed. >> >> ----------- >> >> java.lang.ClassCastException: java.lang.Integer cannot be cast to >> java.lang.Long >> >> at >> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574) >> at >> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185) >> at >> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416) >> at >> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163) >> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855) >> at >> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >> at >> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) >> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) >> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) >> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:413) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) >> at >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> at >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) >> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) >> >> >> ---------- >> Code - >> private static final Schema RESULT_ROW_TYPE = >> Schema.builder() >> .addNullableField("order_id", Schema.FieldType.INT64) >> .addNullableField("site_id", Schema.FieldType.INT64) >> .addNullableField("price", Schema.FieldType.INT64) >> .addNullableField("order_id0", Schema.FieldType.INT64) >> .addNullableField("site_id0", Schema.FieldType.INT64) >> .addNullableField("price0", Schema.FieldType.INT64) >> .build(); >> >> private static final Schema SOURCE_ROW_TYPE = >> Schema.builder() >> .addNullableField("order_id", Schema.FieldType.INT64) >> .addNullableField("site_id", Schema.FieldType.INT64) >> .addNullableField("price", Schema.FieldType.INT64) >> .build(); >> @Test >> public void testInnerJoin() throws Exception { >> OrderHistoryReorderOptions options = >> PipelineOptionsFactory.as(OrderHistoryReorderOptions.class); >> >> options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner"); >> >> TestPipeline pipeline = TestPipeline.fromOptions(options); >> >> Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build(); >> Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build(); >> Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build(); >> Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build(); >> Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build(); >> Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build(); >> Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, >> 3,1,2,3).build(); >> final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3); >> final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6); >> final List<Row> outputRowsToTransform = Arrays.asList(row7); >> PCollection<Row> inputPcoll1 = >> pipeline.apply("col1",Create.of(inputRowsToTransform)) >> .setRowSchema(SOURCE_ROW_TYPE); >> PCollection<Row> inputPcoll2 = >> pipeline.apply("col2",Create.of(inputRowsToTransform1)) >> .setRowSchema(SOURCE_ROW_TYPE); >> String sql = >> "SELECT * " >> + "FROM ORDER_DETAILS1 o1" >> + " JOIN ORDER_DETAILS2 o2" >> + " on " >> + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; >> >> PAssert.that(tuple( >> "ORDER_DETAILS1", >> inputPcoll1, >> "ORDER_DETAILS2", >> inputPcoll2) >> .apply("join", SqlTransform.query(sql)) >> .setRowSchema(RESULT_ROW_TYPE)) >> .containsInAnyOrder(outputRowsToTransform); >> pipeline.run(); >> >> >> Thanks in advance, >> Regards, >> Ana
