Just want to clarify that Beam's concept of windowing is really an
event-time based key, and they are all processed logically simultaneously.
SQL's concept of windowing function is to sort rows and process them
linearly. They are actually totally different. From your queries it seems
you are interested in SQL's windowing functions (aka analytic functions).

I am surprised by the problems with rows, since we have used them
extensively. Hopefully it is not too hard to fix. Same with the UNION ALL

And for the CROSS JOIN it would be a nice feature to allow in some cases it
seems. Should not be hard.

Thank you for reporting this! If you have time it would be really great to
get each of these reproducible problems into GitHub issues, each.


On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi Alexey,
> Thank You for reference to that discussion I do actually have pretty
> similar thoughts on what Beam SQL needs.
> Update from my side:
> Actually did find a workaround for issue with windowing function on
> stream. It basically boils down to using sliding window to collect and
> aggregate the state. But would need an advice if this is actually a cost
> efficient method (targeting DataFlow runner). The doubt that I have is that
> this sliding window would need to have sliding interval less than 1s and
> size more than a week and be feed with quire frequent data. If I do
> understand this correctly - it would mean each input row would need to be
> duplicated for each window and stored which could be quite significant
> storage cost?
> Or actually Beam does not physically duplicate the record but just tracks
> to which windows the record currently belongs?
> And the real issue that BeamSQL needs at the moment in my opinion is
> fixing bugs.
> Some bugs that I found that prevent one from using it and would really
> appreciate fast fix:
> - UNNEST ARRAY with a nested ROW (described below, created ticket -
> https://github.com/apache/beam/issues/26911)
> - PubSub table provider actually requires all table properties to be there
> (with null in `timestampAttributeKey` it fails) - which essentially does
> not allow one to use pubsub publish timestamp as `timestampAttributeKey`.
> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
> DataStoreV1TableProvider to provide a key for storage. Also consider
> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
> requires VARCHAR instead of BYTES - its even easier in implementation.
> - Any hints on how to implement `FireStoreIOTableProvider`? I am
> considering implementing it and contributing depending on my team decision
> - but would like to get like idea how hard this task is.
> Will create tickets for the rest of issues when I will have some spare
> time.
> Best regards
> Wiśniowski Piotr
> On 22.05.2023 18:28, Alexey Romanenko wrote:
> Hi Piotr,
> Thanks for details! I cross-post this to dev@ as well since, I guess,
> people there can provide more insights on this.
> A while ago, I faced the similar issues trying to run Beam SQL against
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can be
> helpful.
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
> —
> Alexey
> On 18 May 2023, at 11:36, Wiśniowski Piotr
> <contact.wisniowskipi...@gmail.com> <contact.wisniowskipi...@gmail.com>
> wrote:
> HI,
> After experimenting with Beam SQL I did find some limitations. Testing on
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
> with Calcite, direct runner and openjdk version "11.0.19". Please let me
> know if some of them are known/ worked on/ have tickets or have estimated
> fix time. I believe most of them are low hanging fruits or just my thinking
> is not right for the problem. If this is the case please guide me to some
> working solution.
>  From my perspective it is ok to have a fix just on master - no need to
> wait for release. Priority order:
> - 7. Windowing function on a stream - in detail - How to get previous
> message for a key? setting expiration arbitrary big is ok, but access to
> the previous record must happen fairly quickly not wait for the big window
> to finish and emit the expired keys. Ideally would like to do it in pure
> beam pipeline as saving to some external key/value store and then reading
> this here could potentially result in some race conditions which in I would
> like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
> one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
> Below queries tat I use to testing this scenarios.
> Thank You for looking at this topics!
> Best
> Wiśniowski Piotr
> -----------------------
> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
> -----------------------
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
> table.
> -- It is not possible to number rows
> WITH data_table AS (
> ),
> number_table AS (
> numbers_exploded AS number_item
> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
> numbers_exploded
> )
> data_table.a,
> number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> -----------------------
> -- 2. ROW construction not supported. It is not possible to nest data
> -----------------------
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
> type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string,
> -- drawback - decomposing might be not supported or would need to be also
> based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
> `json_object`;
> -----------------------
> -- 3. Using * when there is Row type present in the schema
> -----------------------
> `ref` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> -- java.lang.NoSuchFieldException: name
> -- WORKAROUND - refer to columns explicitly with alias
> `ref` AS ref_value,
> test_tmp_1.`author`.`name` AS author_name, -- table name must be
> referenced explicitly - this could be fixed too
> test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> -----------------------
> -- 4. UNNEST ARRAY with nested ROW
> -----------------------
> `ref` VARCHAR,
> `commits` ARRAY<ROW<
> `id` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"python/dbt/tests/dead"}';
> test_tmp.`ref` AS branch_name,
> commit_item.`id` AS commit_hash,
> commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
> description=, type=ROW<name STRING, email STRING>, options={{}}}).
> initialized with 5 fields.
> -- limited WORKAROUND - refer to array elements by index and UNION ALL the
> items into rows
> -- note workaround that uses number table will not work as CROSS JOIN is
> not supported
> WITH data_parsed AS (
> test_tmp.`ref` AS branch_id,
> test_tmp.commits[1].`id` AS commit_hash,
> test_tmp.commits[1].`author`.`name` AS author_name
> FROM test_tmp
> UNION ALL -- this unfortunately works only for two indexes
> test_tmp.`ref` AS branch_id,
> test_tmp.commits[2].`id` AS commit_hash,
> test_tmp.commits[2].`author`.`name` AS author_name
> FROM test_tmp
> )
> FROM data_parsed
> WHERE author_name IS NOT NULL
> ;
> -- better WORKAROUND - but tricky to get right (fragile)
> WITH data_with_number_array AS (
> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
> data to join on later due to CROSS JOIN support limitation
> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
> CARDINALITY(test_tmp.commits) AS commits_size
> FROM test_tmp
> ),
> data_with_numbers AS (
> branch_name,
> `EXPR$0` AS number_item
> FROM data_with_number_array
> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
> WHERE `EXPR$0` <= commits_size
> ),
> data_exploded AS (
> test_tmp.`ref` AS branch_name,
> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
> author_name
> FROM test_tmp
> INNER JOIN data_with_numbers
> ON data_with_numbers.branch_name = test_tmp.`ref`
> )
> branch_name,
> commit_hash,
> author_name
> FROM data_exploded
> -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join
> is not supported`
> -- as it pushes this condition as predicate pushdown to join.
> -- Is there any way to force checking this condition on here and not to
> project it upstream?
> ;
> -----------------------
> -- 5. single UNION ALL possible
> -----------------------
> SELECT 3 AS a;
> -- Wrong number of arguments to BeamUnionRel:
> org.apache.beam.sdk.values.PCollectionList@70f145ac
> -----------------------
> -- 6. Reserved names
> -----------------------
> -- json_object
> SELECT '{}' AS json_object;
> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column
> 13.
> -- WORKAROUND SELECT '{}' AS `json_object`
> -----------------------
> -- 7. Windowing function on stream
> -----------------------
> -- in detail - How to get previous message for a key?
> -- setting expiration arbitrary big is ok, but access to the previous
> record must happen fairly quickly
> -- not wait for the big window to finish and emit the expired keys.
> -- Ideally would like to do it in pure beam pipeline as saving to some
> external key/value store
> -- and then reading this here could potentially result in some race
> conditions which would be hard to debug.
> DROP TABLE IF EXISTS unbounded_stream;
> CREATE EXTERNAL TABLE unbounded_stream(
> sequence BIGINT,
> event_time TIMESTAMP
> )
> TYPE 'sequence'
> TBLPROPERTIES '{"elementsPerSecond":1}'
> ;
> CREATE EXTERNAL TABLE data_1_bounded(
> `sequence_nb` BIGINT,
> `sender_login` VARCHAR,
> `user_id` VARCHAR
> )
> TYPE text
> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
> ;
> test_data_1_unbounded AS (
> sender_login,
> user_id,
> event_time
> FROM unbounded_stream
> INNER JOIN data_1_bounded
> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
> ),
> test_data_1_lookbehind AS (
> sender_login,
> LAST_VALUE(user_id) OVER previous_win AS user_id
> FROM test_data_1_unbounded
> WINDOW previous_win AS (
> PARTITION BY sender_login
> ORDER BY event_time ASC
> )
> )
> FROM test_data_1_lookbehind
> ;
> -- There are not enough rules to produce a node with desired properties:
> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
> is still infinite.
> -- Root: rel#29:RelSubset#4.ENUMERABLE
> -- Original rel:
> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
> --
> -- Sets:
> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
> VARCHAR user_id)
> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> -- rel#21:RelSubset#2.NONE, best=null
> --
> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
> $2]), rowcount=1.2, cumulative cost={inf}
> --
> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
> rowcount=1.2, cumulative cost={inf}

