recommended bigtable-hbase-beam version for SDK 2.50.0

2023-10-02 Thread BumKi Cho
Hi,

I am hoping to find any information or link that points me to which version
of bigtable-hbase-beam I should use with SDK 2.50.0 in Java.
Any help would be appreciated.

Thanks,
BumKi


Re: UDF/UADF over complex structures

2023-10-02 Thread Wiśniowski Piotr

Hi Gyorgy,

I guess Your problem might not be directly related to UDFs not UDAFs but 
to nested structure of Your data. I did have a problem with processing 
nested data and also did not find a way to work this out - especially if 
You are not in control of input data structure.


This issues seem very related as nested Rows need some polishing:

- https://github.com/apache/beam/issues/26911

- https://github.com/apache/beam/issues/27733

I have plans to jump on fix for this two, and I guess it would not be 
hard, but I do not have any capacity for open source tasks for now. 
Anyone would like to take a look at them?


Best

Wiśniowski Piotr


On 28.09.2023 19:31, Balogh, György wrote:
Sorry I was not specific enough. I ment using the SqlTransform 
registerUdf and registerUdaf. I use a lot of SQL in my pipeline and I 
would prefer using SQL UDFs in many cases over writing beam 
transforms. I already have UDFs but I did not find a way to make them 
work over nested structures.

Thank you,
Gyorgy

On Thu, Sep 28, 2023 at 5:40 PM Robert Bradshaw via user 
 wrote:


Yes, for sure. This is one of the areas Beam excels vs.
more simple tools like SQL. You can write arbitrary code to
iterate over arbitrary structures in the typical
Java/Python/Go/Typescript/Scala/[pick your language] way. In the
Beam nomenclature. UDFs correspond to DoFns and UDAFs correspond
to CombineFns.

On Thu, Sep 28, 2023 at 4:23 AM Balogh, György
 wrote:

Hi,
I've complex nested structure in my input data. Is it possible
to have UDF/UDAF taking nested structure as input? I'm using
java. Outputting nested structure is also a question.
Thank you,
Gyorgy
-- 


György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 



--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 


[PYTHON] Yapf configurations to prevent workflow mangling

2023-10-02 Thread Joey Tran
Does anyone have any recommendations on how to get yapf to play nicely with
beam workflows? Left to its own, it absolutely destroys the readability of
a workflow. The solution is simple enough by just adding # yapf:disables
everywhere but it's not really ideal. Are there any specific yapf knobs
recommended for mitigating this?

Best,
Joey

-- 

Joey Tran | Senior Developer Il | AutoDesigner TL

*he/him*

[image: Schrödinger, Inc.] 


Re: simplest way to do exponential moving average?

2023-10-02 Thread Reuven Lax via user
On Mon, Oct 2, 2023 at 2:00 AM Jan Lukavský  wrote:

> Hi,
>
> this depends on how exactly you plan to calculate the average. The
> original definition is based on exponentially decreasing weight of more
> distant (older if time is on the x-axis) data points. This (technically)
> means that this average at any point X1 depends on all values X0 <= X1.
> This would therefore require buffering (using GroupByKey) all elements in
> global window, doing the sorting manually and then computing the new value
> of the average triggering after each element. This is probably the
> technically correct, but most computationally intensive variant.
>

To clarify - you would probably buffer the elements in OrderedListState,
and set periodic event-time timers to fetch them and compute the average.
OrderedListState will return the elements in order, so you wouldn't have to
sort. This is assuming you are talking about streaming pipelines.


> If the average is done over time intervals, then an other option could be
> to define a cut-off interval T, i.e. set the exponentially vanishing weight
> of value of data points to be zero at some T0 < T1 - T. If the data points
> come at some discrete time-intervals (minutes, hours, days), then this
> could mean you can split the data into time sliding windows (window
> interval being the cut-off interval, and slide the update interval) and
> assign weight for each data point in the particular time interval - i.e.
> how much weight does the data point have at the time of end of the sliding
> window. With this you could then using CombineFn to count and sum the
> weighted averages, which would be much more efficient.
>
> Best,
>
>  Jan
> On 9/30/23 17:08, Balogh, György wrote:
>
> Hi,
> I want to calculate the exponential moving average of a signal using beam
> in java.
> I understand there is no time order guarantee on incoming data. What would
> be the simplest solution for this?
> Thank you,
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>


Re: simplest way to do exponential moving average?

2023-10-02 Thread Kenneth Knowles
Just to be pedantic about it: Jan's approach is preferred because it would
be much more _parallel_. Any actual computation that depends on everything
being in order is by definition not parallel (nothing to do with Beam).

Kenn

On Mon, Oct 2, 2023 at 5:00 AM Jan Lukavský  wrote:

> Hi,
>
> this depends on how exactly you plan to calculate the average. The
> original definition is based on exponentially decreasing weight of more
> distant (older if time is on the x-axis) data points. This (technically)
> means that this average at any point X1 depends on all values X0 <= X1.
> This would therefore require buffering (using GroupByKey) all elements in
> global window, doing the sorting manually and then computing the new value
> of the average triggering after each element. This is probably the
> technically correct, but most computationally intensive variant.
>
> If the average is done over time intervals, then an other option could be
> to define a cut-off interval T, i.e. set the exponentially vanishing weight
> of value of data points to be zero at some T0 < T1 - T. If the data points
> come at some discrete time-intervals (minutes, hours, days), then this
> could mean you can split the data into time sliding windows (window
> interval being the cut-off interval, and slide the update interval) and
> assign weight for each data point in the particular time interval - i.e.
> how much weight does the data point have at the time of end of the sliding
> window. With this you could then using CombineFn to count and sum the
> weighted averages, which would be much more efficient.
>
> Best,
>
>  Jan
> On 9/30/23 17:08, Balogh, György wrote:
>
> Hi,
> I want to calculate the exponential moving average of a signal using beam
> in java.
> I understand there is no time order guarantee on incoming data. What would
> be the simplest solution for this?
> Thank you,
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>


Re: [QUESTION] Why no auto labels?

2023-10-02 Thread Joey Tran
You don't have to specify the names if the callable you pass in is
/different/ for two `beam.Map`s, but  if the callable is the same you must
specify a label. For example, the below will raise an exception:

```
| beam.Filter(identity_filter)
| beam.Filter(identity_filter)
```

Here's an example on playground that shows the error message you get [1]. I
marked every line I added with a "# ++".

It's a contrived example, but using a map or filter at the same pipeline
level probably comes up often, at least in my inexperience. For example,
you. might have a pipeline that partitions a pcoll into three different
pcolls, runs some transforms on them, and then runs the same type of filter
on each of them.

The case that happens most often for me is using the `assert_that` [2]
testing transform. In this case, I think often users will really have no
need for a disambiguating label as they're often just writing unit tests
that test a few different properties of their workflow.

[1] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
[2]
https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.testing.util.html#apache_beam.testing.util.assert_that

On Mon, Oct 2, 2023 at 9:08 AM Bruno Volpato via user 
wrote:

> If I understand the question correctly, you don't have to specify those
> names.
>
> As Reuven pointed out, it is probably a good idea so you have a stable /
> deterministic graph.
> But in the Python SDK, you can simply use pcollection | map_fn, instead
> of pcollection | 'Map' >> map_fn.
>
> See an example here
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116
>
>
> On Sun, Oct 1, 2023 at 9:08 PM Joey Tran 
> wrote:
>
>> Hmm, I'm not sure what you mean by "updating pipelines in place". Can you
>> elaborate?
>>
>> I forgot to mention my question is posed from the context of a python SDK
>> user, and afaict, there doesn't seem to be an obvious way to autogenerate
>> names/labels. Hearing that the java SDK supports it makes me wonder if the
>> python SDK could support it as well though... (If so, I'd be happy to do
>> implement it). Currently, it's fairly tedious to have to name every
>> instance of a transform that you might reuse in a pipeline, e.g. when
>> reapplying the same Map on different pcollections.
>>
>> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user 
>> wrote:
>>
>>> Are you talking about transform names? The main reason was because for
>>> runners that support updating pipelines in place, the only way to do so
>>> safely is if the runner can perfectly identify which transforms in the new
>>> graph match the ones in the old graph. There's no good way to auto generate
>>> names that will stay stable across updates - even small changes to the
>>> pipeline might change the order of nodes in the graph, which could result
>>> in a corrupted update.
>>>
>>> However, if you don't care about update, Beam can auto generate these
>>> names for you! When you call PCollection.apply (if using BeamJava), simply
>>> omit the name parameter and Beam will auto generate a unique name for you.
>>>
>>> Reuven
>>>
>>> On Sat, Sep 30, 2023 at 11:54 AM Joey Tran 
>>> wrote:
>>>
 After writing a few pipelines now, I keep getting tripped up from
 accidentally have duplicate labels from using multiple of the same
 transforms without labeling them. I figure this must be a common complaint,
 so I was just curious, what the rationale behind this design was? My naive
 thought off the top of my head is that it'd be more user friendly to just
 auto increment duplicate transforms, but I figure I must be missing
 something

 Cheers,
 Joey

>>>


Re: [QUESTION] Why no auto labels?

2023-10-02 Thread Bruno Volpato via user
If I understand the question correctly, you don't have to specify those
names.

As Reuven pointed out, it is probably a good idea so you have a stable /
deterministic graph.
But in the Python SDK, you can simply use pcollection | map_fn,
instead of pcollection
| 'Map' >> map_fn.

See an example here
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116


On Sun, Oct 1, 2023 at 9:08 PM Joey Tran  wrote:

> Hmm, I'm not sure what you mean by "updating pipelines in place". Can you
> elaborate?
>
> I forgot to mention my question is posed from the context of a python SDK
> user, and afaict, there doesn't seem to be an obvious way to autogenerate
> names/labels. Hearing that the java SDK supports it makes me wonder if the
> python SDK could support it as well though... (If so, I'd be happy to do
> implement it). Currently, it's fairly tedious to have to name every
> instance of a transform that you might reuse in a pipeline, e.g. when
> reapplying the same Map on different pcollections.
>
> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user 
> wrote:
>
>> Are you talking about transform names? The main reason was because for
>> runners that support updating pipelines in place, the only way to do so
>> safely is if the runner can perfectly identify which transforms in the new
>> graph match the ones in the old graph. There's no good way to auto generate
>> names that will stay stable across updates - even small changes to the
>> pipeline might change the order of nodes in the graph, which could result
>> in a corrupted update.
>>
>> However, if you don't care about update, Beam can auto generate these
>> names for you! When you call PCollection.apply (if using BeamJava), simply
>> omit the name parameter and Beam will auto generate a unique name for you.
>>
>> Reuven
>>
>> On Sat, Sep 30, 2023 at 11:54 AM Joey Tran 
>> wrote:
>>
>>> After writing a few pipelines now, I keep getting tripped up from
>>> accidentally have duplicate labels from using multiple of the same
>>> transforms without labeling them. I figure this must be a common complaint,
>>> so I was just curious, what the rationale behind this design was? My naive
>>> thought off the top of my head is that it'd be more user friendly to just
>>> auto increment duplicate transforms, but I figure I must be missing
>>> something
>>>
>>> Cheers,
>>> Joey
>>>
>>


Re: simplest way to do exponential moving average?

2023-10-02 Thread Jan Lukavský

Hi,

this depends on how exactly you plan to calculate the average. The 
original definition is based on exponentially decreasing weight of more 
distant (older if time is on the x-axis) data points. This (technically) 
means that this average at any point X1 depends on all values X0 <= X1. 
This would therefore require buffering (using GroupByKey) all elements 
in global window, doing the sorting manually and then computing the new 
value of the average triggering after each element. This is probably the 
technically correct, but most computationally intensive variant.


If the average is done over time intervals, then an other option could 
be to define a cut-off interval T, i.e. set the exponentially vanishing 
weight of value of data points to be zero at some T0 < T1 - T. If the 
data points come at some discrete time-intervals (minutes, hours, days), 
then this could mean you can split the data into time sliding windows 
(window interval being the cut-off interval, and slide the update 
interval) and assign weight for each data point in the particular time 
interval - i.e. how much weight does the data point have at the time of 
end of the sliding window. With this you could then using CombineFn to 
count and sum the weighted averages, which would be much more efficient.


Best,

 Jan

On 9/30/23 17:08, Balogh, György wrote:

Hi,
I want to calculate the exponential moving average of a signal using 
beam in java.
I understand there is no time order guarantee on incoming data. What 
would be the simplest solution for this?

Thank you,

--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 


Re: EFO KinesisIO watermarking doubt

2023-10-02 Thread Sachin Mittal
Hi,
I have filed an issue: https://github.com/apache/beam/issues/28760
I have also created a PR (based of our local fix for this):
https://github.com/apache/beam/pull/28763
This can serve as a start.

Thanks
Sachin


On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin  wrote:

> Hello, sorry for the late reply.
>
> EFOKinesisReader implemented the same logic of timestamps non-EFO
> KinesisReader had. At the time of EFO implementation more careful
> evaluation of the records' timestamps was out of context.
>
> Can you please create an issue at https://github.com/apache/beam/issues ?
> With an issue we can track this investigation which may become a new PR or
> some clarifications in the IO documentation.
>
> > We wanted the current timestamp based on some custom time embedded
> within the record and not approximate arrival time and not sure how we can
> achieve that.
>
> KinesisIO outputs only byte[] of a message payload without any decoding.
> If your timestamps sit in the messages' payload, I think, this approach
> should work:
> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> 
>
>
>
>
>
> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal  wrote:
>
>> Hi,
>> We are implementing EFO Kinesis IO reader provided by apache beam.
>> I see that in code that for implementation of getCurrentTimestamp we
>> always return getApproximateArrivalTimestamp and not the event time
>> which we may have set for that record using withCustomWatermarkPolicy.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>>
>> However for KafkaIO we do something different:
>> We always get the getCurrentTimestamp based on `timestampPolicy` set for
>> Kafka where user can emit a custom timestamp associated with each record.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>>
>> So why is there a difference in these two implementations?
>>
>> We wanted the current timestamp based on some custom time embedded within
>> the record and not approximate arrival time and not sure how we can achieve
>> that.
>>
>> Please let us know if there is a way out to achieve this for Kinesis.
>>
>> Thanks
>> Sachin
>>
>>