Hi Kerry,

Casting sample to long works. Is there a way to handle int fields as is in a 
row in pcollection?


Regards,
Ana

> 
> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <[email protected]> wrote:
> 
> 
> I believe the issue is that your sample data is cast to Integer, while the 
> schema expects a Long. You can explicitly cast your samples to Long. If you 
> search for that you should find some good examples.
> 
>> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <[email protected]> wrote:
>> 
>> Hi Team,
>> 
>> I am trying to write test cases for Zeta SQL transform. I just tried junit 
>> for a very simple pcollection with int64 values. I am getting the below 
>> error, Am I missing anything here? I am using Junit4 and beam version 2.35. 
>> Please let me know if any other details are needed.
>> 
>> -----------
>> 
>> java.lang.ClassCastException: java.lang.Integer cannot be cast to 
>> java.lang.Long
>> 
>> at 
>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>> at 
>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>> at 
>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>> at 
>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>> at 
>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at 
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at 
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at 
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at 
>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at 
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>> at 
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> at 
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>> 
>> 
>> ----------
>> Code -
>> private static final Schema RESULT_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>                 .build();
>> 
>> private static final Schema SOURCE_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .build();
>> @Test
>> public void testInnerJoin() throws Exception {
>>     OrderHistoryReorderOptions options = 
>> PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>     
>> options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>> 
>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>> 
>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 
>> 3,1,2,3).build();
>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>     PCollection<Row> inputPcoll1 =
>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     PCollection<Row> inputPcoll2 =
>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     String sql =
>>             "SELECT *  "
>>                     + "FROM ORDER_DETAILS1 o1"
>>                     + " JOIN ORDER_DETAILS2 o2"
>>                     + " on "
>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>> 
>>     PAssert.that(tuple(
>>             "ORDER_DETAILS1",
>>             inputPcoll1,
>>             "ORDER_DETAILS2",
>>             inputPcoll2)
>>             .apply("join", SqlTransform.query(sql))
>>             .setRowSchema(RESULT_ROW_TYPE))
>>             .containsInAnyOrder(outputRowsToTransform);
>>     pipeline.run();
>> 
>> 
>> Thanks in advance,
>> Regards,
>> Ana

Reply via email to