Nested row types might be less well supported (r.g. Row) because they were
flattened before anyway.


-Rui

On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <tuya...@paloaltonetworks.com>
wrote:

> Thank you for your response.
> I saw it and applied patch on calcite 1.20. However I realized BeamCalRel
> does not generate right code [1]to turn back Beam types. I am working on
> that now. Please let me know if apache beam support nested row types but I
> miss it.
>
>
> [1]
> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
>
> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang <ruw...@google.com> wrote:
>
>> Calcite has improved to reconstruct ROW back in the output. See [1]. Beam
>> need to update Calcite dependency to > 1.21 to adopt that.
>>
>>
>>
>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_CALCITE-2D3138&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=kPxDNZSy_WpbC0xfVKTFpbSnpFAdhwMZYhSq9L-8H0g&s=jliQ_5N9_-n0EN1qXNmzeBX4m8Xhdcv_UtaHQ812L9Y&e=>
>>
>>
>> -Rui
>>
>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>> records. I read them as Pcollection<Row> and apply Select * query and
>>> compare with initial rows. Looks like nested rows are flatten by calcite.
>>> How do you have any idea how can I avoid this?
>>>
>>> I added a same testcase for my issue:
>>>
>>> Schema nestedSchema =
>>>     Schema.builder()
>>>         .addInt32Field("f_nestedInt")
>>>         .addStringField("f_nestedString")
>>>         .addInt32Field("f_nestedIntPlusOne")
>>>         .build();
>>> Schema inputType =
>>>     Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>> nestedSchema).build();
>>>
>>> PCollection<Row> input =
>>>     pipeline.apply(
>>>         Create.of(
>>>             Row.withSchema(inputType)
>>>                 .addValues(
>>>                     1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>> 313).build())
>>>                 .build())
>>>             .withRowSchema(inputType))
>>>     .setRowSchema(inputType);
>>>
>>> PCollection<Row> result =
>>>     input
>>>         .apply(
>>>             SqlTransform.query(
>>>                 "SELECT * FROM PCOLLECTION"));
>>>
>>> PAssert.that(result)
>>>     .containsInAnyOrder(Row.withSchema(inputType)
>>>         .addValues(
>>>             1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>> 313).build())
>>>         .build());
>>>
>>>
>>> Thank you so much in advance.
>>>
>>>

Reply via email to