Re: SQL Windowing

2020-05-28 Thread Maximilian Michels
Thanks for the quick reply Brian! I've filed a JIRA for option (a):
https://jira.apache.org/jira/browse/BEAM-10143

Makes sense to define DATETIME as a logical type. I'll check out your
PR. We could work around this for now by doing a cast, e.g.:

  TUMBLE(CAST(f_timestamp AS DATETIME), INTERVAL '30' MINUTE)

Note that we may have to do a more sophisticated cast to convert the
Python micros into a DATETIME.

-Max

On 28.05.20 19:18, Brian Hulette wrote:
> Hey Max,
> Thanks for kicking the tires on SqlTransform in Python :)
> 
> We don't have any tests of windowing and Sql in Python yet, so I'm not
> that surprised you're running into issues here. Portable schemas don't
> support the DATETIME type, because we decided not to define it as one of
> the atomic types [1] and hope to add support via a logical type instead
> (see BEAM-7554 [2]). This was the motivation for the MillisInstant PR I
> put up, and the ongoing discussion [3].
> Regardless, that should only be an obstacle for option (b), where you'd
> need to have a DATETIME in the input and/or output PCollection of the
> SqlTransform. In theory option (a) should be possible, so I'd consider
> that a bug - can you file a jira for it?
> 
> Brian
> 
> [1] 
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58
> [2] https://issues.apache.org/jira/browse/BEAM-7554
> [3] 
> https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E
> 
> On Thu, May 28, 2020 at 9:45 AM Maximilian Michels  > wrote:
> 
> Hi,
> 
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. The SQL docs [1] mention that you can either (a) window the
> input or (b) window in the SQL query.
> 
> Option (a):
> 
>   input
>       | "Window >> beam.WindowInto(window.FixedWindows(30))
>       | "Aggregate" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field
>                    """)
> 
> This results in an exception:
> 
>   Caused by: java.lang.ClassCastException:
>   org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
>   to org.apache.beam.sdk.transforms.windowing.GlobalWindow
> 
> => Is this a bug?
> 
> 
> Let's try Option (b):
> 
>   input
>       | "Aggregate & Window" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field,
>                                TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                    """)
> 
> The issue that I'm facing here is that the timestamp is already assigned
> to my values but is not exposed as a field. So I need to use a DoFn to
> extract the timestamp as a new field:
> 
>   class GetTimestamp(beam.DoFn):
>     def process(self, event, timestamp=beam.DoFn.TimestampParam):
>       yield TimestampedRow(..., timestamp)
> 
>   input
>       | "Extract timestamp" >>
>       beam.ParDo(GetTimestamp())
>       | "Aggregate & Window" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field,
>                                TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                    """)
> 
> => It would be very convenient if there was a reserved field name which
> would point to the timestamp of an element. Maybe there is?
> 
> 
> -Max
> 
> 
> [1]
> 
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
> 


Re: SQL Windowing

2020-05-28 Thread Brian Hulette
Hey Max,
Thanks for kicking the tires on SqlTransform in Python :)

We don't have any tests of windowing and Sql in Python yet, so I'm not that
surprised you're running into issues here. Portable schemas don't support
the DATETIME type, because we decided not to define it as one of the atomic
types [1] and hope to add support via a logical type instead (see BEAM-7554
[2]). This was the motivation for the MillisInstant PR I put up, and the
ongoing discussion [3].
Regardless, that should only be an obstacle for option (b), where you'd
need to have a DATETIME in the input and/or output PCollection of the
SqlTransform. In theory option (a) should be possible, so I'd consider that
a bug - can you file a jira for it?

Brian

[1]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58
[2] https://issues.apache.org/jira/browse/BEAM-7554
[3]
https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E

On Thu, May 28, 2020 at 9:45 AM Maximilian Michels  wrote:

> Hi,
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. The SQL docs [1] mention that you can either (a) window the
> input or (b) window in the SQL query.
>
> Option (a):
>
>   input
>   | "Window >> beam.WindowInto(window.FixedWindows(30))
>   | "Aggregate" >>
>   SqlTransform("""Select field, count(field) from PCOLLECTION
>   WHERE ...
>   GROUP BY field
>""")
>
> This results in an exception:
>
>   Caused by: java.lang.ClassCastException:
>   org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
>   to org.apache.beam.sdk.transforms.windowing.GlobalWindow
>
> => Is this a bug?
>
>
> Let's try Option (b):
>
>   input
>   | "Aggregate & Window" >>
>   SqlTransform("""Select field, count(field) from PCOLLECTION
>   WHERE ...
>   GROUP BY field,
>TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>""")
>
> The issue that I'm facing here is that the timestamp is already assigned
> to my values but is not exposed as a field. So I need to use a DoFn to
> extract the timestamp as a new field:
>
>   class GetTimestamp(beam.DoFn):
> def process(self, event, timestamp=beam.DoFn.TimestampParam):
>   yield TimestampedRow(..., timestamp)
>
>   input
>   | "Extract timestamp" >>
>   beam.ParDo(GetTimestamp())
>   | "Aggregate & Window" >>
>   SqlTransform("""Select field, count(field) from PCOLLECTION
>   WHERE ...
>   GROUP BY field,
>TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>""")
>
> => It would be very convenient if there was a reserved field name which
> would point to the timestamp of an element. Maybe there is?
>
>
> -Max
>
>
> [1]
>
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
>


SQL Windowing

2020-05-28 Thread Maximilian Michels
Hi,

I'm using the SqlTransform as an external transform from within a Python
pipeline. The SQL docs [1] mention that you can either (a) window the
input or (b) window in the SQL query.

Option (a):

  input
  | "Window >> beam.WindowInto(window.FixedWindows(30))
  | "Aggregate" >>
  SqlTransform("""Select field, count(field) from PCOLLECTION
  WHERE ...
  GROUP BY field
   """)

This results in an exception:

  Caused by: java.lang.ClassCastException:
  org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
  to org.apache.beam.sdk.transforms.windowing.GlobalWindow

=> Is this a bug?


Let's try Option (b):

  input
  | "Aggregate & Window" >>
  SqlTransform("""Select field, count(field) from PCOLLECTION
  WHERE ...
  GROUP BY field,
   TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
   """)

The issue that I'm facing here is that the timestamp is already assigned
to my values but is not exposed as a field. So I need to use a DoFn to
extract the timestamp as a new field:

  class GetTimestamp(beam.DoFn):
def process(self, event, timestamp=beam.DoFn.TimestampParam):
  yield TimestampedRow(..., timestamp)

  input
  | "Extract timestamp" >>
  beam.ParDo(GetTimestamp())
  | "Aggregate & Window" >>
  SqlTransform("""Select field, count(field) from PCOLLECTION
  WHERE ...
  GROUP BY field,
   TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
   """)

=> It would be very convenient if there was a reserved field name which
would point to the timestamp of an element. Maybe there is?


-Max


[1]
https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/


Re: [SQL] Windowing and triggering changes proposal

2018-01-19 Thread Tyler Akidau
I'm late to the party as usual, but also added some comments. Overall
supportive of this work. Thanks for the clear analysis, Anton.

On Tue, Jan 16, 2018 at 10:58 AM Mingmin Xu  wrote:

> Thanks @Anton for the proposal. Window(w/ trigger) support in SQL is
> limited now, you're very welcome to join the improvement.
>
> There's a balance between injected DSL mode and CLI mode when we were
> implementing BealmSQL overall, not only widowing. Many default behaviors
> are introduced to make it workable in pure SQL CLI scenario. If it limits
> the potential with DSL mode, we should adjust it absolutely.
>
> Mingmin
>
> On Tue, Jan 16, 2018 at 9:56 AM, Kenneth Knowles  wrote:
>
>> I've commented on the doc. This is a really nice analysis and I think the
>> proposal is good for making SQL work with Beam windowing and triggering in
>> a way that will make sense to users.
>>
>> Kenn
>>
>> On Thu, Jan 11, 2018 at 4:05 PM, Anton Kedin  wrote:
>>
>>> Hi,
>>>
>>> Wanted to gather feedback on changes I propose to the behavior of some
>>> aspects of windowing and triggering in Beam SQL.
>>>
>>> In short:
>>>
>>> Beam SQL currently overrides input PCollections' windowing/triggering
>>> configuration in few cases. For example if a query has a simple GROUP BY
>>> clause, we would apply GlobalWindows. And it's not configurable by the
>>> user, it happens under the hood of SQL.
>>>
>>> Proposal is to update the Beam SQL implementation in these cases to
>>> avoid changing the input PCollections' configuration as much as possible.
>>>
>>> More details here:
>>> https://docs.google.com/document/d/1RmyV9e1Qab-axsLI1WWpw5oGAJDv0X7y9OSnPnrZWJk
>>>
>>> Regards,
>>> Anton
>>>
>>
>>
>
>
> --
> 
> Mingmin
>


Re: [SQL] Windowing and triggering changes proposal

2018-01-16 Thread Mingmin Xu
Thanks @Anton for the proposal. Window(w/ trigger) support in SQL is
limited now, you're very welcome to join the improvement.

There's a balance between injected DSL mode and CLI mode when we were
implementing BealmSQL overall, not only widowing. Many default behaviors
are introduced to make it workable in pure SQL CLI scenario. If it limits
the potential with DSL mode, we should adjust it absolutely.

Mingmin

On Tue, Jan 16, 2018 at 9:56 AM, Kenneth Knowles  wrote:

> I've commented on the doc. This is a really nice analysis and I think the
> proposal is good for making SQL work with Beam windowing and triggering in
> a way that will make sense to users.
>
> Kenn
>
> On Thu, Jan 11, 2018 at 4:05 PM, Anton Kedin  wrote:
>
>> Hi,
>>
>> Wanted to gather feedback on changes I propose to the behavior of some
>> aspects of windowing and triggering in Beam SQL.
>>
>> In short:
>>
>> Beam SQL currently overrides input PCollections' windowing/triggering
>> configuration in few cases. For example if a query has a simple GROUP BY
>> clause, we would apply GlobalWindows. And it's not configurable by the
>> user, it happens under the hood of SQL.
>>
>> Proposal is to update the Beam SQL implementation in these cases to avoid
>> changing the input PCollections' configuration as much as possible.
>>
>> More details here: https://docs.google.com/docume
>> nt/d/1RmyV9e1Qab-axsLI1WWpw5oGAJDv0X7y9OSnPnrZWJk
>>
>> Regards,
>> Anton
>>
>
>


-- 

Mingmin


Re: [SQL] Windowing and triggering changes proposal

2018-01-16 Thread Kenneth Knowles
I've commented on the doc. This is a really nice analysis and I think the
proposal is good for making SQL work with Beam windowing and triggering in
a way that will make sense to users.

Kenn

On Thu, Jan 11, 2018 at 4:05 PM, Anton Kedin  wrote:

> Hi,
>
> Wanted to gather feedback on changes I propose to the behavior of some
> aspects of windowing and triggering in Beam SQL.
>
> In short:
>
> Beam SQL currently overrides input PCollections' windowing/triggering
> configuration in few cases. For example if a query has a simple GROUP BY
> clause, we would apply GlobalWindows. And it's not configurable by the
> user, it happens under the hood of SQL.
>
> Proposal is to update the Beam SQL implementation in these cases to avoid
> changing the input PCollections' configuration as much as possible.
>
> More details here: https://docs.google.com/document/d/1RmyV9e1Qab-
> axsLI1WWpw5oGAJDv0X7y9OSnPnrZWJk
>
> Regards,
> Anton
>


[SQL] Windowing and triggering changes proposal

2018-01-11 Thread Anton Kedin
Hi,

Wanted to gather feedback on changes I propose to the behavior of some
aspects of windowing and triggering in Beam SQL.

In short:

Beam SQL currently overrides input PCollections' windowing/triggering
configuration in few cases. For example if a query has a simple GROUP BY
clause, we would apply GlobalWindows. And it's not configurable by the
user, it happens under the hood of SQL.

Proposal is to update the Beam SQL implementation in these cases to avoid
changing the input PCollections' configuration as much as possible.

More details here:
https://docs.google.com/document/d/1RmyV9e1Qab-axsLI1WWpw5oGAJDv0X7y9OSnPnrZWJk

Regards,
Anton