Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Creating composite DoFns is tricky today due to how they are implemented (via annotated methods). However providing such a method to compose DoFns would be very useful IMO. On Fri, Sep 15, 2023 at 9:33 AM Joey Tran wrote: > Yeah for (1) the concern would be adding a shuffle/fusion break and (2)

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Correct - I was referring to Java. On Fri, Sep 15, 2023 at 9:55 AM Robert Bradshaw wrote: > On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user > wrote: > >> Creating composite DoFns is tricky today due to how they are implemented >> (via annotated methods). >>

Re: How can we get multiple side inputs from a single pipeline ?

2023-08-28 Thread Reuven Lax via user
This looks fine. One caveat: there currently appears to be a bug in Beam when you apply a combiner followed by View.asSingleton. I would recommend replacing these lines: .apply(Latest.globally()) .apply(View.asSingleton()) With the following: .apply(Reify.timestamps())

Re: [QUESTION] Why no auto labels?

2023-10-01 Thread Reuven Lax via user
Are you talking about transform names? The main reason was because for runners that support updating pipelines in place, the only way to do so safely is if the runner can perfectly identify which transforms in the new graph match the ones in the old graph. There's no good way to auto generate

Re: @FieldAccess parameter types not being enforced vs corresponding schema types in Java DoFn

2023-09-18 Thread Reuven Lax via user
Good question - I know it will be enforced at runtime (I think you'll get a ClassCastException if things don't match) but I'd have to check to see if we enforce it at graph-submission time. If we don't have this validation in place, adding it would be an improvement. On Mon, Sep 18, 2023 at 3:04 

Re: simplest way to do exponential moving average?

2023-10-02 Thread Reuven Lax via user
On Mon, Oct 2, 2023 at 2:00 AM Jan Lukavský wrote: > Hi, > > this depends on how exactly you plan to calculate the average. The > original definition is based on exponentially decreasing weight of more > distant (older if time is on the x-axis) data points. This (technically) > means that this

Re: Generating Hearbeats Using Looping Timer

2022-07-09 Thread Reuven Lax via user
On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra wrote: > Maybe the previous post was too verbose so I will try to summarize my > question - > If one instance of DoFn tries to set a timer for a time which is behind > the pipeline's watermark, can this cause the pipeline to stall for other > keys as

Re: GroupIntoBatches not working on Flink?

2022-07-26 Thread Reuven Lax via user
This might be a bug in the Flink runner, because it is implemented here . On Tue, Jul 26, 2022 at 9:14 AM Cristian

Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-29 Thread Reuven Lax via user
Google Cloud Dataflow does support snapshots . Is this what you were looking for? On Mon, Aug 29, 2022 at 4:04 PM Kenneth Knowles wrote: > Hi Will, David, > > I think you'll find the best source of answer for this sort of question

Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-30 Thread Reuven Lax via user
Snapshots are expected to happen nearly instantaneously. While processing is paused while the snapshot is in progress, the pause should usually be very brief. It's true that Dataflow does not support automated snapshots - you would have to create them yourself using a cron. Checkpoints on Flink

Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Reuven Lax via user
PCollections's usually are persistent within a pipeline, so you can reuse them in other parts of a pipeline with no problem. There is no notion of state across pipelines - every pipeline is independent. If you want state across pipelines you can write the PCollection out to a set of files which

Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
> will those messages(A) get consumed again from Kafka or will the messages > get recovered from the checkpoint and retried in that specific operator? > > On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user > wrote: > >> >> >> On Sun, Sep 25, 2022 at 4:56 AM Yo

Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva wrote: > Hi all, > > I have started using KafkaIO to read a data stream and have the following > questions. Appreciate it if you could provide a few clarifications on the > following. > > 1. Does KafkaIO ignore the offset stored in the broker and

Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
It's not public because it was added for use in unit tests, and modifying this value can have very unexpected results (e.g. making it smaller can trigger a completely different codepath that is triggered when there are too many files, leading to unexpected cost increases in the pipeline). Out of

Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
The default max file size is 4Tib. BigQuery supports files up to 5Tib, but there might be some slop in our file-size estimation which is why Beam set a slightly lower limit. In any case, you won't be able to increase that value by too much, or BigQuery will reject the load job. The default max

Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
- I think we should support mixing and matching SchemaProviders > for nested types. > > [1] https://github.com/apache/beam/issues/20359 > > On Thu, Aug 4, 2022 at 2:45 PM Reuven Lax via user > wrote: > >> We do have JavaBeanSchema which might work, depending on wheth

Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
We do have JavaBeanSchema which might work, depending on whether your thrift class conforms to java beans. On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van wrote: > Hi, > > I have an AutoValue class and it looks like this > > @AutoValue > @DefaultSchema( AutoValueSchema.class ) > public abstract

Re: Beam saves filepaths in Flink's state

2022-12-08 Thread Reuven Lax via user
This doesn't sound ideal to me. For contrast, Dataflow doesn't save any of these things (coders, transforms, configs) in state, which makes it easier for Dataflow to update pipelines. On Thu, Dec 8, 2022 at 7:48 AM Cristian Constantinescu wrote: > Hi everyone, > > I noticed that the Flink state

Re: Single side input to multiple transforms

2022-11-07 Thread Reuven Lax via user
Is this a Python job? On Mon, Nov 7, 2022 at 12:38 AM Binh Nguyen Van wrote: > Hi, > > I am writing a pipeline where I have one singleton side input that I want > to use in multiple different transforms. When I run the pipeline in Google > Dataflow I see multiple entries in the logs that have a

Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Reuven Lax via user
In Apache Beam, Flatten is a union operation - it takes multiple PCollections (of the same type) and merges them into a single PCollection. On Mon, Mar 13, 2023 at 11:32 AM Godefroy Clair wrote: > Hi, > I am wondering about the way `Flatten()` and `FlatMap()` are implemented > in Apache Beam

Re: Successful Inserts for Storage Write API?

2023-03-02 Thread Reuven Lax via user
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is not currently supported for Storage Write API. On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang wrote: > Thank you to Ahmed and Reuven for the tip on > WriteResult::getFailedStorageApiInserts. > > When I tried to get the

Re: Deduplicate usage

2023-03-02 Thread Reuven Lax via user
State is per-key, and keys are distributed across workers. Two workers should not be working on the same state. On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van wrote: > Thank you Ankur, > > This is the current source code of Deduplicate transform. > > Boolean seen = seenState.read(); >

Re: getFailedInsertsWithErr and Storage Write API

2023-03-01 Thread Reuven Lax via user
Correct, however if you are using a recent version of Beam you can call WriteResult.getFailedStorageApiInserts On Wed, Mar 1, 2023 at 3:00 PM Matthew Ouyang wrote: > The documentation says WriteResult.getFailedInserts won’t return anything > when used with the Storage Write API ( >

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-17 Thread Reuven Lax via user
u tried setting spark.sql.adaptive.enabled & >> spark.sql.adaptive.coalescePartitions.enabled >> >> >> >> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >> user@beam.apache.org> wrote: >> >>> I see. Robert - what is the story for p

Re: Loosing records when using BigQuery IO Connector

2023-04-17 Thread Reuven Lax via user
What version of Beam are you using? There are no known data-loss bugs in the connector, however if there has been a regression we would like to address it with high priority. On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van wrote: > Hi, > > I have a job that uses BigQuery IO Connector to write

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-18 Thread Reuven Lax via user
;> wrote: >>> >>>> To a (small) degree Sparks “new” AQE might be able to help depending on >>>> what kind of operations Beam is compiling it down to. >>>> >>>> Have you tried setting spark.sql.adaptive.enabled & >>>> spark.sq

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
); > }) > .withResults() > ); > result.apply(Wait.on(insert)) > .apply("Selecting", new SomeTransform()) > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > https://githu

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
I believe you have to call withResults() on the JdbcIO transform in order for this to work. On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar wrote: > I hope you all are doing well. I am facing an issue with an Apache Beam > pipeline that gets stuck indefinitely when using the Wait.on transform >

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
at 10:28 AM Juan Cuzmar wrote: > I'm developing with direct runner. but should go to dataflow when > deployed. > > > Original Message > On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote: > > > What runner are you using to r

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
gt; .withResults() > ); > result.apply(Wait.on(insert)) > .apply("Selecting", new SomeTransform()) > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > p.run(); > > updated the gith

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-15 Thread Reuven Lax via user
The maximum parallelism is always determined by the parallelism of your data. If you do a GroupByKey for example, the number of keys in your data determines the maximum parallelism. Beyond the limitations in your data, it depends on your execution engine. If you're using Dataflow, Dataflow is

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
el. And the input size of the operator is unknown at compiling stage if >>>>> it is not a source >>>>> operator, >>>>> >>>>> Here's an example of flink >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/d

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
;> >>> Here's an example of flink >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>> Spark also support to set operator level parallelism (see groupByKey >>> and reduceByKey): >>>

Re: major reduction is performance when using schema registry - KafkaIO

2023-04-09 Thread Reuven Lax via user
How are you using the schema registry? Do you have a code sample? On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov wrote: > Hello, > > I am trying to understand the effect of schema registry on our pipeline's > performance. In order to do sowe created a very simple pipeline that reads > from

Re: Successful Inserts for Storage Write API?

2023-03-21 Thread Reuven Lax via user
ar with Storage Write. > > On Thu, Mar 2, 2023 at 4:48 PM Reuven Lax via user > wrote: > >> Are you trying to do this in order to use Wait.on? getSuccessfulInserts >> is not currently supported for Storage Write API. >> >> On Thu, Mar 2, 2023 at 1:44 PM

Re: Some events are discarded from a FixedWindow

2024-02-21 Thread Reuven Lax via user
On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user < user@beam.apache.org> wrote: > Hi, > > > > We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a > bunch of events from Kafka and should execute an SQL command on a 1-hour > window. Some of the events arrive late. > >

Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-12 Thread Reuven Lax via user
Are you setting the enable_custom_pubsub_source experiment by any chance? On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin wrote: > Hi all, > > When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to > 2.52.0, an incompatibility warning is surfaced that prevents pipeline > upgrade:

Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-18 Thread Reuven Lax via user
Some comments here: 1. All messages in a PubSub topic is not a well-defined statement, as there can always be more messages published. You may know that nobody will publish any more messages, but the pipeline does not. 2. While it's possible to read from Pub/Sub in batch, it's usually not

Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread Reuven Lax via user
This is the stack trace of the rethrown exception. The log should also contain a "caused by" log somewhere detailing the original exception. Do you happen to have that? On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com wrote: > Here is the complete stacktrace It doesn't even hit my code and it >

Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
Can you explain the use case a bit more? In order to write a SQL statement (at least one that doesn't use wildcard selection) you also need to know the schema ahead of time. What are you trying to accomplish with these dynamic schemas? Reuven On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov

Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
am. > I would like to be able to define the sql query via configuration. > In addition in our use case the kafka message schema and the row schema > are pretty much the same. So i wonder if i could reuse it. > > Thanks > Sigalit > > בתאריך יום א׳, 28 בינו׳ 2024, 20:23, מאת Reu

Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-21 Thread Reuven Lax via user
is a bounded input. > > _/ > _/ Alex Van Boxel > > > On Fri, Jan 19, 2024 at 12:18 AM Reuven Lax via user > wrote: > >> Some comments here: >>1. All messages in a PubSub topic is not a well-defined statement, as >> there can always be more messages pu

Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
: > ProcessEvents receive as an input a Session object and créate a KV SharedCoreEvent> as an output > > El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user < > user@beam.apache.org> escribió: > >> There are some sharp edges unfortunately around auto-inferen

Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are some sharp edges unfortunately around auto-inference of KV coders and schemas. Is there a previous PCollection of type SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents? On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas wrote: > Hello guys > > I have a question, is it

Re: How to handle Inheritance with AutoValueSchema

2024-04-09 Thread Reuven Lax via user
I don't see any unit tests for inherited AutoValue accessors, so I suspect it simply does not work today with AutoValueSchema. This is something that's probably fixable (though such a fix does risk breaking some users). On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas wrote: > Hello Guys > > I have

Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Reuven Lax via user
There are various strategies. Here is an example of how Beam does it (taken from Reshuffle.viaRandomKey().withNumBuckets(N) Note that this does some extra hashing to work around issues with the Spark runner. If you don't care about that, you could implement something simpler (e.g. initialize