Local Combiner for GroupByKey on Flink Streaming jobs

2023-05-23 Thread Talat Uyarer via dev
Sorry for cross posting

-- Forwarded message -
From: Talat Uyarer 
Date: Fri, May 19, 2023, 2:25 AM
Subject: Local Combiner for GroupByKey on Flink Streaming jobs
To: 


Hi,

I have a stream aggregation job which is running on Flink 1.13 I generate
DAG by using Beam SQL. My SQL query has a TUMBLE window. Basically My
pipeline reads from kafka aggregate, counts/sums some values by streamin
aggregation and writes a Sink.

BeamSQl uses Groupbykey for the aggregation part. When I read the
translation code for Group By Key class in Flink Runner [1] I could not see
any local combiner. I see ReducerFunction but I feel it works on the
reducer side. If this is true. How can I implement a local reducer in
Source step to improve shuffling performance or Do I miss something?

If you need more information about my pipeline I share some below.

Thanks
[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L905


This is my SQL query : "SELECT log_source_id, SUM(size) AS total_size FROM
PCOLLECTION  GROUP BY log_source_id, TUMBLE(log_time, INTERVAL '1' MINUTE)"
When I submit the job Flink generates two fused steps Source -> Sink Step.
I shared the Task Name below.
First Step Source step:
Source:
Kafka_IO/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
->
Flat Map ->
ParMultiDo(AvroBytesToRowConverter) ->
BeamCalcRel_47/ParDo(Calc)/ParMultiDo(Calc) ->
BeamAggregationRel_48/assignEventTimestamp/AddTimestamps/ParMultiDo(AddTimestamps)
->
BeamAggregationRel_48/Window.Into()/Window.Assign.out ->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/selectKeys/AddKeys/Map/ParMultiDo(Anonymous)
->
ToBinaryKeyedWorkItem

Second Step is Aggregation and Sink Step:

BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/GroupByKey ->
ToGBKResult ->
BeamAggregationRel_48/Group.CombineFieldsByFields/Combine/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToRow/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/mergeRecord/ParMultiDo(Anonymous) ->
BeamCalcRel_49/ParDo(Calc)/ParMultiDo(Calc) ->
ParMultiDo(RowToOutputFormat) ->
ParMultiDo(SinkProcessor)


Re: Watermark Alignment on Flink Runner's UnboundedSourceWrapper

2023-05-23 Thread Talat Uyarer via dev
Hi Jan,

Yes My plan is implementing this feature on FlinkRunner. I have one more
question. Does Flink Runner support EventTime or Beam  Custom Watermark ?
Do I need to set AutoWatermarkInterval for stateful Beam Flink Jobs. Or
Beam timers can handle it without setting that param ?

Thanks

On Tue, May 23, 2023 at 12:03 AM Jan Lukavský  wrote:

> Hi Talat,
>
> your analysis is correct, aligning watermarks for jobs with high watermark
> skew in input partitions really results in faster checkpoints and reduces
> the size of state. There are generally two places you can implement this -
> in user code (the source) or inside runner. The user code can use some
> external synchronization (e.g. ZooKeeper) to keep track of progress of all
> individual sources. Another option is to read the watermark from Flink's
> Rest API (some inspiration here [1]).
>
> Another option would be to make use of [2] and implement this directly in
> FlinkRunner. I'm not familiar with any possible limitations of this, this
> was added to Flink quite recently (we would have to support this only when
> running on Flink 1.15+).
>
> If you would like to go for the second approach, I'd be happy to help with
> some guidance.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_O2-2DCzech-2DRepublic_proxima-2Dplatform_blob_master_flink_utils_src_main_java_cz_o2_proxima_flink_utils_FlinkGlobalWatermarkTracker.java=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=9YSIXGwhsRQ08Q4jSLt6pJtZ17cvw5mL-MEt-oCZcP8=>
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D182-253A-2BSupport-2Bwatermark-2Balignment-2Bof-2BFLIP-2D27-2BSources=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=xL-Z7KyqXzMfcalEPIc9nMzaorgJ7s3cHH444pReL1c=>
> On 5/23/23 01:05, Talat Uyarer via dev wrote:
>
> Maybe the User list does not have knowledge about this. That's why I also
> resend on the Dev list. Sorry for cross posting
>
>
> Hi All,
>
> I have a stream aggregation job which reads from Kafka and writes some
> Sinks.
>
> When I submit my job Flink checkpoint size keeps increasing if I use
> unaligned checkpoint settings and it does not emit any window results.
> If I use an aligned checkpoint, size is somewhat under control(still big)
> but Checkpoint alignment takes a long time.
>
> I would like to implement something similar [1]. I believe
> if UnboundedSourceWrapper pause reading future watermark partitions it will
> reduce the size of the checkpoint and I can use unaligned checkpointing.
> What do you think about this approach ? Do you have another solution ?
>
> One more question: I was reading code to implement the above idea. I saw
> this code [2] Does Flink Runner have a similar implementation?
>
> Thanks
>
> [1] https://github.com/apache/flink/pull/11968
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_11968=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=Rb3yOAuXoya8Yo5IMdRYyxBpvWzJ3UmqhPUgc1WJdNs=>
> [2]
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_runners_flink_src_main_java_org_apache_beam_runners_flink_translation_wrappers_streaming_state_FlinkStateInternals.java-23L207=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=iyl4EcoO9Vtd-X9IxkJHUtgFaHEmUTyM__0qmkCIeQ4=>
>
>


Watermark Alignment on Flink Runner's UnboundedSourceWrapper

2023-05-22 Thread Talat Uyarer via dev
Maybe the User list does not have knowledge about this. That's why I also
resend on the Dev list. Sorry for cross posting


Hi All,

I have a stream aggregation job which reads from Kafka and writes some
Sinks.

When I submit my job Flink checkpoint size keeps increasing if I use
unaligned checkpoint settings and it does not emit any window results.
If I use an aligned checkpoint, size is somewhat under control(still big)
but Checkpoint alignment takes a long time.

I would like to implement something similar [1]. I believe
if UnboundedSourceWrapper pause reading future watermark partitions it will
reduce the size of the checkpoint and I can use unaligned checkpointing.
What do you think about this approach ? Do you have another solution ?

One more question: I was reading code to implement the above idea. I saw
this code [2] Does Flink Runner have a similar implementation?

Thanks

[1] https://github.com/apache/flink/pull/11968
[2]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207


Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Talat Uyarer via dev
Hi Andrew,

Thank you so much for your help. Sorry to hear you changed team :(  I can
handle calcite upgrades if there is a fix. I was working on calcite upgrade
but then we started having so many issues. That's why I stopped doing it.

Talat

On Thu, Mar 2, 2023 at 11:56 AM Andrew Pilloud  wrote:

> Hi Talat,
>
> I managed to turn your test case into something against Calcite. It
> looks like there is a bug affecting tables that contain one or more
> single element structs and no multi element structs. I've sent the
> details to the Calcite mailing list here.
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread_tlr9hsmx09by79h91nwp2d4nv8jfwsto=DwIFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr=g36wnBGvi7DQG7gvljaG08vXIhROyCoz5vWBBRS43Ag=
>
> I'm experimenting with ideas on how to work around this but a fix will
> likely require a Calcite upgrade, which is not something I'd have time
> to help with. (I'm not on the Google Beam team anymore.)
>
> Andrew
>
> On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
>  wrote:
> >
> > Hi @Andrew Pilloud
> >
> > Sorry for the late response. Yes your test is working fine. I changed
> the test input structure like our input structure. Now this test also has
> the same exception.
> >
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQL:
> > WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`,
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> > FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> > WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> > FROM `tempTable` AS `tempTable`)
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQLPlan>
> > LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
> >   LogicalFilter(condition=[=($0.name, 'innerStr')])
> > LogicalProject(name=[$0.name], id=[$1], value=[$2])
> >   BeamIOSourceRel(table=[[beam, panwRowTestTable]])
> >
> >
> > fieldList must not be null, type = VARCHAR
> > java.lang.AssertionError: fieldList must not be null, type = VARCHAR
> >
> > I dont know what is different from yours. I am sharing my version of the
> test also.
> >
> >
> > Index:
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > IDEA additional info:
> > Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> > <+>UTF-8
> > ===
> > diff --git
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > ---
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> > +++
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (date 1677017032324)
> > @@ -54,6 +54,9 @@
> >private static final Schema innerRowSchema =
> >
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
> >
> > +  private static final Schema innerPanwRowSchema =
> > +  Schema.builder().addStringField("name").build();
> > +
> >private static final Schema innerRowWithArraySchema =
> >Schema.builder()
> >.addStringField("string_field")
> > @@ -127,8 +130,12 @@
> >.build()))
> >.put(
> >"basicRowTestTable",
> > -  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col")
> > -
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> > +  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col", FieldType.INT64, "field")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
> 1L))
> > +.put(
> > +  "panwRowTestTable",
> > +
> TestBoundedTable.of(FieldType.row(innerPanwRowSchema), "user_info",
> FieldType.INT64, "id", FieldType.STRING, "value")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L,
> "some_value"))
> >.put(
> >"rowWithArrayTestTable",
> >
> TestBoundedTable.of(FieldType.row(rowWithArraySchema), "col")
> > @@ -219,6 +226,21 @@
> >  .build());
> >  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> >}
> > +
> > +  @Test
> > +  public void testBasicRowWhereField() {
> > +BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> > +PCollection stream =
> > +BeamSqlRelUtils.toPCollection(
> > +pipeline, 

Re: Review ask for Flink Runner Backlog Metric Bug Fix

2023-02-23 Thread Talat Uyarer via dev
Would you like to be a volunteer  +Andrew Pilloud  :)


On Thu, Feb 23, 2023 at 4:51 PM Andrew Pilloud  wrote:

> The bot says there are no reviewers for Flink. Possibly you'll find a
> volunteer to review it here?
>
> On Thu, Feb 23, 2023 at 4:47 PM Talat Uyarer via dev 
> wrote:
>
>> Hi,
>>
>> I created a bugfix for Flink Runner backlog metrics. I asked OWNERs and
>> try to run assign reviewer command. But I am not sure. I pressed the right
>> button :)
>>
>> If you know some who can review this change
>> https://github.com/apache/beam/pull/25554
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_pull_25554=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=S6n2DxisnBviN8s6A5tC1OTdh4Y4g53qUUyw8KDNx814eSLIpQfgJKUu_oR5m0BV=RjdSXbeBNeE-3w57PRVrflp7K6ezSkIj4T1q35TIfJI=>
>>
>> Could you assign him/her to this mr ?
>>
>> Thanks
>>
>


Review ask for Flink Runner Backlog Metric Bug Fix

2023-02-23 Thread Talat Uyarer via dev
Hi,

I created a bugfix for Flink Runner backlog metrics. I asked OWNERs and try
to run assign reviewer command. But I am not sure. I pressed the right
button :)

If you know some who can review this change
https://github.com/apache/beam/pull/25554

Could you assign him/her to this mr ?

Thanks


Re: Beam SQL Alias issue while using With Clause

2023-02-22 Thread Talat Uyarer via dev
Hi @Andrew Pilloud 

Sorry for the late response. Yes your test is working fine. I changed the
test input structure like our input structure. Now this test also has the
same exception.

Feb 21, 2023 2:02:28 PM
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: SQL:
WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`,
`panwRowTestTable`.`id`, `panwRowTestTable`.`value`
FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT
`tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
FROM `tempTable` AS `tempTable`)
Feb 21, 2023 2:02:28 PM
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
  LogicalFilter(condition=[=($0.name, 'innerStr')])
LogicalProject(name=[$0.name], id=[$1], value=[$2])
  BeamIOSourceRel(table=[[beam, panwRowTestTable]])


fieldList must not be null, type = VARCHAR
java.lang.AssertionError: fieldList must not be null, type = VARCHAR

I dont know what is different from yours. I am sharing my version of the
test also.


Index:
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
(revision fd383fae1adc545b6b6a22b274902cda956fec49)
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
(date 1677017032324)
@@ -54,6 +54,9 @@
   private static final Schema innerRowSchema =

 
Schema.builder().addStringField("string_field").addInt64Field("long_field").build();

+  private static final Schema innerPanwRowSchema =
+  Schema.builder().addStringField("name").build();
+
   private static final Schema innerRowWithArraySchema =
   Schema.builder()
   .addStringField("string_field")
@@ -127,8 +130,12 @@
   .build()))
   .put(
   "basicRowTestTable",
-  TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
-
 .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
+  TestBoundedTable.of(FieldType.row(innerRowSchema),
"col", FieldType.INT64, "field")
+
 .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
1L))
+.put(
+  "panwRowTestTable",
+  TestBoundedTable.of(FieldType.row(innerPanwRowSchema),
"user_info", FieldType.INT64, "id", FieldType.STRING, "value")
+
 .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L,
"some_value"))
   .put(
   "rowWithArrayTestTable",
   TestBoundedTable.of(FieldType.row(rowWithArraySchema),
"col")
@@ -219,6 +226,21 @@
 .build());
 pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
+
+  @Test
+  public void testBasicRowWhereField() {
+BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
+PCollection stream =
+BeamSqlRelUtils.toPCollection(
+pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM
panwRowTestTable WHERE panwRowTestTable.`user_info`.`name` = 'innerStr')
SELECT * FROM tempTable"));
+Schema outputSchema = Schema.builder().addRowField("col",
innerRowSchema).addInt64Field("field").build();
+PAssert.that(stream)
+.containsInAnyOrder(
+Row.withSchema(outputSchema)
+
 .addValues(Row.withSchema(innerRowSchema).addValues("name", 1L).build(),
1L)
+.build());
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }

   @Test
   public void testArrayConstructor() {




On Fri, Feb 10, 2023 at 6:14 PM Andrew Pilloud  wrote:

> I have a test case that I believe should reproduce this on both head
> and 2.43 but it ends up with a different logical plan. Can you provide your
> input types?
>
> We have a class of issues around compex types
> https://github.com/apache/beam/issues/19009
> 
> I don't believe the "LogicalFilter(condition=[=($2.name
> 

Re: Beam SQL Alias issue while using With Clause

2023-02-03 Thread Talat Uyarer via dev
Hi Andrew,

Thank you for your MR. I am parricated to help us to solve the issue. I
rerun our tests and they are partially passing now with your fix.  However,
there is one more issue with the WITH clause.

When i run following query somehow beam lost type of column

WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE
PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable

I havent test on Beam Master. I run with your latest patch on our code
base. This is the output

14:00:30.095 [Test worker] INFO
 o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`,
`PCOLLECTION`.`user_info`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT `tempTable`.`id`,
`tempTable`.`value`, `tempTable`.`user_info`
FROM `tempTable` AS `tempTable`)
14:00:30.106 [Test worker] DEBUG
o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting
SqlNode to RelNode
LogicalProject(id=[$0], value=[$1], user_info=[$2])
  LogicalFilter(condition=[=($2.name, 'User1')])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])

14:00:30.107 [Test worker] DEBUG
o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting
SqlNode to RelNode
LogicalProject(id=[$0], value=[$1], user_info=[$2])
  LogicalFilter(condition=[=($2.name, 'User1')])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])

14:00:30.109 [Test worker] INFO
 o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan>
LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)])
  LogicalFilter(condition=[=($2.name, 'User1')])
LogicalProject(id=[$0], value=[$1], name=[$2.name])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

14:00:30.173 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
COST = {inf}
14:00:30.173 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
[BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] rels [#27]
14:00:30.173 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118: Apply rule
[BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] to
[rel#27:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, PCOLLECTION])]
14:00:30.174 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#41
via BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118 generated 1
successors:
[rel#41:BeamEnumerableConverter.ENUMERABLE(input=BeamIOSourceRel#27)]
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
COST = {inf}
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
[ProjectToCalcRule] rels [#33]
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136: Apply rule
[ProjectToCalcRule] to
[rel#33:LogicalProject.NONE(input=RelSubset#32,inputs=0..1,exprs=[$2.name])]
14:00:30.177 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#44
via ProjectToCalcRule
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136 generated 1
successors:
[rel#44:LogicalCalc.NONE(input=RelSubset#32,expr#0..2={inputs},expr#3=$
t2.name,proj#0..1={exprs},2=$t3)]
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
COST = {inf}
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
[FilterToCalcRule] rels [#35]
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#160: Apply rule
[FilterToCalcRule] to
[rel#35:LogicalFilter.NONE(input=RelSubset#34,condition==($2.name,
'User1'))]

fieldList must not be null, type = VARCHAR
java.lang.AssertionError: fieldList must not be null, type = VARCHAR
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelDataTypeImpl.getFieldList(RelDataTypeImpl.java:164)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.checkValid(RexFieldAccess.java:76)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.(RexFieldAccess.java:64)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:208)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:911)
at

Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Talat Uyarer via dev
Hi Andrew,

Yes This aligned also with my debugging. In My Kenn's reply you can see a
sql test which I wrote in Calcite. Somehow Calcite does not have this issue
with the 1.28 version.

!use post
!set outputformat mysql

#Test aliases with with clause
WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id,
"hr"."emps"."name" as v from "hr"."emps")
SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE
tempTable.v <> '11' ;
+-+---+
| ID  | value |
+-+---+
| 100 | Bill  |
| 110 | Theodore  |
| 150 | Sebastian |
| 200 | Eric  |
+-+---+
(4 rows)

!ok


On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud  wrote:

> Yes, that worked.
>
> The issue does not occur if I disable all of the following planner rules:
> CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>
> All the rules share a common call to RexProgramBuilder.mergePrograms, so I
> suspect the problem lies there. I spent some time looking but wasn't able
> to find it by code inspection, it looks like this code path is doing the
> right thing with names. I'll spend some time tomorrow trying to reproduce
> this on pure Calcite.
>
> Andrew
>
>
> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer 
> wrote:
>
>> Hi Andrew,
>>
>> Thanks for writing a test for this use case. Without Where clause it
>> works as expected on our test cases also too. Please add where clause on
>> second select. With the below query it does not return column names. I
>> tested on my local also.
>>
>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>> id > 1
>>
>> Thanks
>>
>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>> wrote:
>>
>>> +dev@beam.apache.org 
>>>
>>> I tried reproducing this but was not successful, the output schema was
>>> as expected. I added the following to BeamSqlMultipleSchemasTest.java at
>>> head. (I did discover that  PAssert.that(result).containsInAnyOrder(output)
>>> doesn't validate column names however.)
>>>
>>>   @Test
>>>   public void testSelectAs() {
>>> PCollection input = pipeline.apply(create(row(1, "strstr")));
>>>
>>> PCollection result =
>>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS
>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int,
>>> v AS fout_string FROM tempTable"));
>>>
>>> Schema output_schema =
>>>
>>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
>>> assertThat(result.getSchema(), equalTo(output_schema));
>>>
>>> Row output = Row.withSchema(output_schema).addValues(1,
>>> "strstr").build();
>>> PAssert.that(result).containsInAnyOrder(output);
>>> pipeline.run();
>>>   }
>>>
>>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Hi Kenn,

 Thank you for replying back to my email.

 I was under the same impression about Calcite. But I wrote a test on
 Calcite 1.28 too. It is working without issue that I see on BEAM

 Here is my test case. If you want you can also run on Calcite. Please
 put under core/src/test/resources/sql as text file. and Run CoreQuidemTest
 class.

 !use post
 !set outputformat mysql

 #Test aliases with with clause
 WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
 "hr"."emps"."name" as v from "hr"."emps")
 SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
 tempTable.v <> '11' ;
 +-+---+
 | ID  | value |
 +-+---+
 | 100 | Bill  |
 | 110 | Theodore  |
 | 150 | Sebastian |
 | 200 | Eric  |
 +-+---+
 (4 rows)

 !ok


 On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles 
 wrote:

> Looking at the code that turns a logical CalcRel into a BeamCalcRel I
> do not see any obvious cause for this:
> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
> 
>
> I don't like to guess that upstream libraries have the bug, but in
> this case I wonder if the alias is lost in the Calcite optimizer rule for
> merging the projects and filters into a Calc.
>
> Kenn
>

Re: Beam SQL Alias issue while using With Clause

2023-01-24 Thread Talat Uyarer via dev
Hi Andrew,

Thanks for writing a test for this use case. Without Where clause it works
as expected on our test cases also too. Please add where clause on second
select. With the below query it does not return column names. I tested on
my local also.

WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
id > 1

Thanks

On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud  wrote:

> +dev@beam.apache.org 
>
> I tried reproducing this but was not successful, the output schema was as
> expected. I added the following to BeamSqlMultipleSchemasTest.java at head.
> (I did discover that  PAssert.that(result).containsInAnyOrder(output)
> doesn't validate column names however.)
>
>   @Test
>   public void testSelectAs() {
> PCollection input = pipeline.apply(create(row(1, "strstr")));
>
> PCollection result =
> input.apply(SqlTransform.query("WITH tempTable (id, v) AS (SELECT
> f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int, v AS
> fout_string FROM tempTable"));
>
> Schema output_schema =
>
> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
> assertThat(result.getSchema(), equalTo(output_schema));
>
> Row output = Row.withSchema(output_schema).addValues(1,
> "strstr").build();
> PAssert.that(result).containsInAnyOrder(output);
> pipeline.run();
>   }
>
> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer 
> wrote:
>
>> Hi Kenn,
>>
>> Thank you for replying back to my email.
>>
>> I was under the same impression about Calcite. But I wrote a test on
>> Calcite 1.28 too. It is working without issue that I see on BEAM
>>
>> Here is my test case. If you want you can also run on Calcite. Please put
>> under core/src/test/resources/sql as text file. and Run CoreQuidemTest
>> class.
>>
>> !use post
>> !set outputformat mysql
>>
>> #Test aliases with with clause
>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>> "hr"."emps"."name" as v from "hr"."emps")
>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>> tempTable.v <> '11' ;
>> +-+---+
>> | ID  | value |
>> +-+---+
>> | 100 | Bill  |
>> | 110 | Theodore  |
>> | 150 | Sebastian |
>> | 200 | Eric  |
>> +-+---+
>> (4 rows)
>>
>> !ok
>>
>>
>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles  wrote:
>>
>>> Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
>>> not see any obvious cause for this:
>>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>>> 
>>>
>>> I don't like to guess that upstream libraries have the bug, but in this
>>> case I wonder if the alias is lost in the Calcite optimizer rule for
>>> merging the projects and filters into a Calc.
>>>
>>> Kenn
>>>
>>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles 
>>> wrote:
>>>
 I am not sure I understand the question, but I do see an issue.

 Context: "CalcRel" is an optimized relational operation that is
 somewhat like ParDo, with a small snippet of a single-assignment DSL
 embedded in it. Calcite will choose to merge all the projects and filters
 into the node, and then generates Java bytecode to directly execute the 
 DSL.

 Problem: it looks like the CalcRel has output columns with aliases "id"
 and "v" where it should have output columns with aliases "id" and "value".

 Kenn

 On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:

> Adding: @Andrew Pilloud  @Kenneth Knowles
> 
>
> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
> u...@beam.apache.org> wrote:
>
>> Hi All,
>>
>> I am using Beam 2.43 with Calcite SQL with Java.
>>
>> I have a query with a WITH clause and some aliasing. Looks like Beam
>> Query optimizer after optimizing my query, it drops Select statement's
>> aliases. Can you help me to identify where the problem is ?
>>
>> This is my query
>> INFO: SQL:
>> WITH `tempTable` (`id`, `v`) AS (SELECT
>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id`
>> AS `id`, `tempTable`.`v` AS `value`
>> FROM `tempTable` AS `tempTable`
>> WHERE `tempTable`.`v` <> '11')
>>
>> This is Calcite Plan