Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Reuven Lax via user
There are various strategies. Here is an example of how Beam does it (taken
from Reshuffle.viaRandomKey().withNumBuckets(N)

Note that this does some extra hashing to work around issues with the Spark
runner. If you don't care about that, you could implement something simpler
(e.g. initialize shard to a random number in StartBundle, and increment it
mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
  private int shard;
  private @Nullable Integer numBuckets;

  public AssignShardFn(@Nullable Integer numBuckets) {
this.numBuckets = numBuckets;
  }

  @Setup
  public void setup() {
shard = ThreadLocalRandom.current().nextInt();
  }

  @ProcessElement
  public void processElement(@Element T element,
OutputReceiver> r) {
++shard;
// Smear the shard into something more random-looking, to avoid issues
// with runners that don't properly hash the key being shuffled, but rely
// on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
// which for Integer is a no-op and it is an issue:
// 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
// spark.html
// This hashing strategy is copied from
// 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
if (numBuckets != null) {
  UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
  hashOfShard =
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
}
r.output(KV.of(hashOfShard, element));
  }
}



On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
wrote:

> Good day, Ruben,
>
> Would you be able to compute a shasum on the group of IDs to use as the
> key?
>
> Best,
>
> Damon
>
> On 2024/04/12 19:22:45 Ruben Vargas wrote:
> > Hello guys
> >
> > Maybe this question was already answered, but I cannot find it  and
> > want some more input on this topic.
> >
> > I have some messages that don't have any particular key candidate,
> > except the ID,  but I don't want to use it because the idea is to
> > group multiple IDs in the same batch.
> >
> > This is my use case:
> >
> > I have an endpoint where I'm gonna send the message ID, this endpoint
> > is gonna return me certain information which I will use to enrich my
> > message. In order to avoid fetching the endpoint per message I want to
> > batch it in 100 and send the 100 IDs in one request ( the endpoint
> > supports it) . I was thinking on using GroupIntoBatches.
> >
> > - If I choose the ID as the key, my understanding is that it won't
> > work in the way I want (because it will form batches of the same ID).
> > - Use a constant will be a problem for parallelism, is that correct?
> >
> > Then my question is, what should I use as a key? Maybe something
> > regarding the timestamp? so I can have groups of messages that arrive
> > at a certain second?
> >
> > Any suggestions would be appreciated
> >
> > Thanks.
> >
>


Re: How to handle Inheritance with AutoValueSchema

2024-04-09 Thread Reuven Lax via user
I don't see any unit tests for inherited AutoValue accessors, so I suspect
it simply does not work today with AutoValueSchema. This is something
that's probably fixable (though such a fix does risk breaking some users).

On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas 
wrote:

> Hello Guys
>
> I have a PCollection with a "Session" object, inside that object I
> have a list of events
>
> For each event, I have different types, EventA, EventB, EventC and so
> on.. all of them extend from Event, which will contain common fields.
>
> According to the AutoValue documentation, inheritance from another
> AutoValue class is not supported. but extend to have the fields is.
> (
> https://github.com/google/auto/blob/main/value/userguide/howto.md#-have-autovalue-also-implement-abstract-methods-from-my-supertypes
> )
>
> But when I run my pipeline, it fails with an NPE.
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:153)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:143)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> ~[?:?]
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> ~[?:?]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:?]
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:144)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:138)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:93)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:145)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:130)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
> ~[beam-sdks-java-core-2.55.0.jar:?]
>
>
> Is this not supported? or is it a Bug?  should I file an issue in GH then?
>
> Thanks
>


Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are ways you can manually force the coder here. However I would first
try to split up the KV creation into two operations. Have ProcessEvents
just create a PCollection, and then a following operation
to create the KV. Something like this:

input.apply(ParDo.of(New ProcessEvents()))
.apply(WithKeys.of((SerializableFunction)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));

I suspect that this will allow the mechanism to better infer the final
Coder. If that doesn't work, you could always brute force it like this:

PCollection coreEvents = input.apply(ParDo.of(New
ProcessEvents()));
coreEvents.apply(WithKeys.of((SerializableFunction)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
 .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
 .apply(Reshuffle.of())
 ... etc.


On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas  wrote:

> ProcessEvents receive as an input a Session object and créate a KV SharedCoreEvent> as an output
>
> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user <
> user@beam.apache.org> escribió:
>
>> There are some sharp edges unfortunately around auto-inference of KV
>> coders and schemas. Is there a previous PCollection of type
>> SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents?
>>
>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas 
>> wrote:
>>
>>> Hello guys
>>>
>>> I have a question, is it possible to use KV along with AutoValueSchema
>>> objects? I'm having troubles when I try to use it together.
>>>
>>> I have an object like the following
>>>
>>> @AutoValue
>>> @DefaultSchema(AutoValueSchema.class)
>>> public abstract class SharedCoreEvent {
>>>
>>> @JsonProperty("subscriptionId")
>>> public abstract String getSubscription();
>>>
>>> 
>>> }
>>>
>>> Then I have a pipeline like the following:
>>>
>>>  input.apply(ParDo.of(new ProcessEvents()))
>>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>>
>>> My input is a single object and my ProcessEvents will produce tons of
>>> events, in a fan-out fashion. that is why I used Reshuffle here
>>>
>>> But when I run this pipeline it throws the following error:
>>>
>>> lang.IllegalStateException: Unable to return a default Coder for
>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>>> [PCollection@2131266396]. Correct one of the following root causes:
>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>
>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>> coder for parameterized type
>>> org.apache.beam.sdk.values.KV:
>>> Unable to provide a Coder for events.SharedCoreEvent
>>>   Building a Coder using a registered CoderProvider failed.
>>>
>>>
>>> Something similar happens with my source when I use KafkaIO and the
>>> source produces a KV  PCollection.
>>>
>>> Is there any reason for this? Maybe I'm misusing the schemas?
>>>
>>> Really appreciate your help
>>>
>>> Thanks
>>> Ruben
>>>
>>


Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are some sharp edges unfortunately around auto-inference of KV coders
and schemas. Is there a previous PCollection of type SharedCoreEvent, or is
the SharedCoreEvent created in ProcessEvents?

On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas  wrote:

> Hello guys
>
> I have a question, is it possible to use KV along with AutoValueSchema
> objects? I'm having troubles when I try to use it together.
>
> I have an object like the following
>
> @AutoValue
> @DefaultSchema(AutoValueSchema.class)
> public abstract class SharedCoreEvent {
>
> @JsonProperty("subscriptionId")
> public abstract String getSubscription();
>
> 
> }
>
> Then I have a pipeline like the following:
>
>  input.apply(ParDo.of(new ProcessEvents()))
> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>
> My input is a single object and my ProcessEvents will produce tons of
> events, in a fan-out fashion. that is why I used Reshuffle here
>
> But when I run this pipeline it throws the following error:
>
> lang.IllegalStateException: Unable to return a default Coder for
> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
> [PCollection@2131266396]. Correct one of the following root causes:
>   No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Cannot provide
> coder for parameterized type
> org.apache.beam.sdk.values.KV:
> Unable to provide a Coder for events.SharedCoreEvent
>   Building a Coder using a registered CoderProvider failed.
>
>
> Something similar happens with my source when I use KafkaIO and the
> source produces a KV  PCollection.
>
> Is there any reason for this? Maybe I'm misusing the schemas?
>
> Really appreciate your help
>
> Thanks
> Ruben
>


Re: Some events are discarded from a FixedWindow

2024-02-21 Thread Reuven Lax via user
On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user <
user@beam.apache.org> wrote:

> Hi,
>
>
>
> We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a
> bunch of events from Kafka and should execute an SQL command on a 1-hour
> window. Some of the events arrive late.
>
> I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s
> fields as the timestamp.
>
> For the aggregation, it’s important that the window triggers *exactly
> once* with all the events, with allowed lateness of 3 minutes. I defined
> the window as:
>
>
>
> final PCollection windowSelectFields = selectFields
>
> .apply("Windowing", Window
>
>
> .into(FixedWindows.of(Duration.standardHours(1)))
>
> .triggering(Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3)))
>
> )
>
> .withAllowedLateness(Duration.standardMinutes(3))
>
> .accumulatingFiredPanes()
>
> );
>
>
>
> When tested on a smaller window with a small number of events, I see that
> the first 3 out of 10 events are being discarded. From the log, it looks
> like the trigger is executed *1 second ahead of time*. I suspect that as
> a result, its shouldFire() method returns false, since 3 minutes have not
> passed yet.
>

Processing-time triggers are based on the local clock on a worker, and
clocks can skew between workers (they can even skew between different
processes on the same worker).


>
>
> 2024-02-21 16:27:08,079 DEBUG
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
> [] - Setting timer: 1:1708533008079 at 1708533008079 with output time
> 1708533008079.  (that is *4:30:08.079 PM*)
>
>
>
> And later on:
>
>
>
> 2024-02-21 *16:30:07,944* DEBUG
> org.apache.beam.sdk.util.WindowTracing   [] -
> ReduceFnRunner: Received timer key:Row:
>
> call_direction:-1729318488
>
> ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z);
> data:TimerData{timerId=1:1708533008079, timerFamilyId=,
> namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)),
> timestamp=2024-02-*21T16:30:08.079Z*,
> outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME,
> deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z;
> outputWatermark:2024-02-21T16:18:04.000Z
>
>
>
> Is my understanding correct?
>
> Did I define the window and timestamps correctly?
>
> Any help would be appreciated.
>
>
>
> Thanks,
>
> Ifat
>
>
>


Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
If you want to define the sql query via configuration, does that mean you
know the query when you launch the pipeline (as configuration is set at
launch time)? If so, you can also dynamically set the schema at pipeline
launch time.

On Sun, Jan 28, 2024 at 11:21 AM Sigalit Eliazov 
wrote:

> Im trying to define a generic pipeline that reads message from kafka where
> the topic is configurable.
> We read a generic record and schema id is part of thr consumed message .
> So this part can be generalized
>
> Then we would like to activate different queries on the stream.
> I would like to be able to define the sql query via configuration.
> In addition in our use case the kafka message schema and the row schema
> are pretty much the same. So i wonder if i could reuse it.
>
> Thanks
> Sigalit
>
> בתאריך יום א׳, 28 בינו׳ 2024, 20:23, מאת Reuven Lax via user ‏<
> user@beam.apache.org>:
>
>> Can you explain the use case a bit more? In order to write a SQL
>> statement (at least one that doesn't use wildcard selection) you also need
>> to know the schema ahead of time. What are you trying to accomplish with
>> these dynamic schemas?
>>
>> Reuven
>>
>> On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov 
>> wrote:
>>
>>> Hello, In the upcoming process, we extract Avro messages from Kafka 
>>> utilizing the Confluent Schema Registry.
>>>
>>> Our intention is to implement SQL queries on the streaming data.
>>>
>>>
>>> As far as I understand, since I am using the Flink runner, when creating  
>>> the features PCollection, I must specify the
>>>
>>> row schema or a coder.
>>>
>>>
>>> I am interested in utilizing the schema obtained from the recently read 
>>> message (refer to ConvertRow).
>>>
>>> Is it possible to accomplish this when executing on a Flinkrunner?
>>>
>>> I noticed that the Flink runner anticipates the row schema to be 
>>> predetermined during pipeline deployment.
>>>
>>>
>>> Are there any potential solutions or workarounds for this situation?
>>>
>>>
>>> public class BeamSqlTest {
>>>
>>>
>>> public static void main(String[] args) {
>>>
>>> Pipeline pipeline;
>>> PCollection> 
>>> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
>>> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>>>  options.getSourceKafkaTopic(), PIPELINE_NAME));
>>> PCollection> avroMessages = 
>>> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
>>> ConvertFromKafkaRecord<>()));
>>>
>>> PCollection features = avroMessages.apply(ParDo.of(new 
>>> ConvertToRow())).setRowSchema(XXX);
>>> final PCollection select_fields = features.apply("Select 
>>> Fields", Select.fieldNames("X","Y","Z"));
>>>
>>> final PCollection windowRes = select_fields.apply("Windowing", 
>>> Window.into(FixedWindows.of(Duration.standardMinutes(1;
>>> PCollection outputStream = 
>>> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
>>> pipeline.run().waitUntilFinish();
>>> }
>>>
>>> @AllArgsConstructor
>>> public static class ConvertToRow extends DoFn>> GenericRecord>, Row> {
>>> @ProcessElement
>>> @SuppressWarnings({"ConstantConditions", "unused"})
>>> public void processElement(ProcessContext c) {
>>> GenericRecord record = c.element().getValue();
>>> final org.apache.avro.Schema avroSchema = record.getSchema();
>>> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>>>
>>> Object x = record.get("X");
>>> Object y = record.get("Y");
>>> Object z = record.get("Z");
>>> Row row = Row.withSchema(schema).addValues(x, y, z).build();
>>> c.output(row);
>>> }
>>> }
>>> }
>>>
>>>
>>> Thanks
>>>
>>> Sigalit
>>>
>>>


Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
Can you explain the use case a bit more? In order to write a SQL statement
(at least one that doesn't use wildcard selection) you also need to know
the schema ahead of time. What are you trying to accomplish with these
dynamic schemas?

Reuven

On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov  wrote:

> Hello, In the upcoming process, we extract Avro messages from Kafka utilizing 
> the Confluent Schema Registry.
>
> Our intention is to implement SQL queries on the streaming data.
>
>
> As far as I understand, since I am using the Flink runner, when creating  the 
> features PCollection, I must specify the
>
> row schema or a coder.
>
>
> I am interested in utilizing the schema obtained from the recently read 
> message (refer to ConvertRow).
>
> Is it possible to accomplish this when executing on a Flinkrunner?
>
> I noticed that the Flink runner anticipates the row schema to be 
> predetermined during pipeline deployment.
>
>
> Are there any potential solutions or workarounds for this situation?
>
>
> public class BeamSqlTest {
>
>
> public static void main(String[] args) {
>
> Pipeline pipeline;
> PCollection> 
> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>  options.getSourceKafkaTopic(), PIPELINE_NAME));
> PCollection> avroMessages = 
> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
> ConvertFromKafkaRecord<>()));
>
> PCollection features = avroMessages.apply(ParDo.of(new 
> ConvertToRow())).setRowSchema(XXX);
> final PCollection select_fields = features.apply("Select 
> Fields", Select.fieldNames("X","Y","Z"));
>
> final PCollection windowRes = select_fields.apply("Windowing", 
> Window.into(FixedWindows.of(Duration.standardMinutes(1;
> PCollection outputStream = 
> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
> pipeline.run().waitUntilFinish();
> }
>
> @AllArgsConstructor
> public static class ConvertToRow extends DoFn, 
> Row> {
> @ProcessElement
> @SuppressWarnings({"ConstantConditions", "unused"})
> public void processElement(ProcessContext c) {
> GenericRecord record = c.element().getValue();
> final org.apache.avro.Schema avroSchema = record.getSchema();
> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>
> Object x = record.get("X");
> Object y = record.get("Y");
> Object z = record.get("Z");
> Row row = Row.withSchema(schema).addValues(x, y, z).build();
> c.output(row);
> }
> }
> }
>
>
> Thanks
>
> Sigalit
>
>


Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-21 Thread Reuven Lax via user
Cloud Storage subscriptions are a reasonable way to backup data to storage,
and you can then run a batch pipeline over the GCS files. Keep in mind that
these files might contain duplicates (the storage subscriptions do not
guarantee exactly-once writes). If this is a problem, you should add a
deduplication stage to the batch job that processes these files.

On Sun, Jan 21, 2024 at 2:45 AM Alex Van Boxel  wrote:

> There are some valid use cases where you want to handle data going over
> Pubsub to handle in batch. It's way too expensive to run a simple daily
> extract from the data over Pubsub; batch is a lot cheaper.
>
> What we do is backup the data to Cloud Storage; Pubsub has recently added
> a nice feature that can help you:
>
>- https://cloud.google.com/pubsub/docs/cloudstorage
>-
>
> https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#subscription_properties
>
> This reduced our cost dramatically. We had a Dataflow doing the backup to
> Cloud Storage, but the above feature is way cheaper. Use the export to Avro
> (the schema is in the second link), and then your batch beam pipeline input
> is a bounded input.
>
>  _/
> _/ Alex Van Boxel
>
>
> On Fri, Jan 19, 2024 at 12:18 AM Reuven Lax via user 
> wrote:
>
>> Some comments here:
>>1. All messages in a PubSub topic is not a well-defined statement, as
>> there can always be more messages published. You may know that nobody will
>> publish any more messages, but the pipeline does not.
>>2. While it's possible to read from Pub/Sub in batch, it's usually not
>> recommended. For one thing I don't think that the batch runner can maintain
>> exactly-once processing when reading from Pub/Sub.
>>3. In Java you can turn an unbounded source (Pub/Sub) into a bounded
>> source that can in theory be used for batch jobs. However this is done by
>> specifying either the max time to read or the max number of messages. I
>> don't think there's any way to automatically read the Pub/Sub topic until
>> there are no more messages in it.
>>
>> Reuven
>>
>> On Thu, Jan 18, 2024 at 2:25 AM Sumit Desai via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I want to create a Dataflow pipeline using Pub/sub as an input connector
>>> but I want to run it in batch mode and not streaming mode. I know it's not
>>> possible in Python but how can I achieve this in Java? Basically, I want my
>>> pipeline to read all messages in a Pubsub topic, process and terminate.
>>> Please suggest.
>>>
>>> Thanks & Regards,
>>> Sumit Desai
>>>
>>


Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-18 Thread Reuven Lax via user
Some comments here:
   1. All messages in a PubSub topic is not a well-defined statement, as
there can always be more messages published. You may know that nobody will
publish any more messages, but the pipeline does not.
   2. While it's possible to read from Pub/Sub in batch, it's usually not
recommended. For one thing I don't think that the batch runner can maintain
exactly-once processing when reading from Pub/Sub.
   3. In Java you can turn an unbounded source (Pub/Sub) into a bounded
source that can in theory be used for batch jobs. However this is done by
specifying either the max time to read or the max number of messages. I
don't think there's any way to automatically read the Pub/Sub topic until
there are no more messages in it.

Reuven

On Thu, Jan 18, 2024 at 2:25 AM Sumit Desai via user 
wrote:

> Hi all,
>
> I want to create a Dataflow pipeline using Pub/sub as an input connector
> but I want to run it in batch mode and not streaming mode. I know it's not
> possible in Python but how can I achieve this in Java? Basically, I want my
> pipeline to read all messages in a Pubsub topic, process and terminate.
> Please suggest.
>
> Thanks & Regards,
> Sumit Desai
>


Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-12 Thread Reuven Lax via user
Are you setting the enable_custom_pubsub_source experiment by any chance?

On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin  wrote:

> Hi all,
>
> When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to
> 2.52.0, an incompatibility warning is surfaced that prevents pipeline
> upgrade:
>
>
>> The Coder or type for step .../PubsubUnboundedSource has changed
>
>
> Was there an intentional coder change introduced for PubsubMessage in
> 2.52.0?  I didn't note anything in the release notes
> https://beam.apache.org/blog/beam-2.52.0/ nor recent changes in
> PubsubMessageWithAttributesCoder[1].  Specifically the step uses
> `PubsubMessageWithAttributesCoder` via
> `PubsubIO.readMessagesWithAttributes()`
>
> Thanks!
>
>
> [1]
> https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36
>


Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread Reuven Lax via user
This is the stack trace of the rethrown exception. The log should also
contain a "caused by" log somewhere detailing the original exception. Do
you happen to have that?

On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com  wrote:

> Here is the complete stacktrace  It doesn't even hit my code and it
> happens consistently!
>
> Error message from worker: java.lang.RuntimeException:
> java.lang.IllegalStateException
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
> Caused by: java.lang.IllegalStateException
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430)
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)
>
> Regards,
> Siyuan
>
> On Wed, Dec 6, 2023 at 10:12 AM Ahmed Abualsaud 
> wrote:
>
>> Hey, can you provide the full stack trace for the error you're seeing?
>> Also is this happening consistently?
>>
>> *+1* to raising a Google ticket where we'll have more visibility.
>>
>> On Wed, Dec 6, 2023 at 11:33 AM John Casey 
>> wrote:
>>
>>> Hmm. It may be best if you raise a ticket with 

Re: simplest way to do exponential moving average?

2023-10-02 Thread Reuven Lax via user
On Mon, Oct 2, 2023 at 2:00 AM Jan Lukavský  wrote:

> Hi,
>
> this depends on how exactly you plan to calculate the average. The
> original definition is based on exponentially decreasing weight of more
> distant (older if time is on the x-axis) data points. This (technically)
> means that this average at any point X1 depends on all values X0 <= X1.
> This would therefore require buffering (using GroupByKey) all elements in
> global window, doing the sorting manually and then computing the new value
> of the average triggering after each element. This is probably the
> technically correct, but most computationally intensive variant.
>

To clarify - you would probably buffer the elements in OrderedListState,
and set periodic event-time timers to fetch them and compute the average.
OrderedListState will return the elements in order, so you wouldn't have to
sort. This is assuming you are talking about streaming pipelines.


> If the average is done over time intervals, then an other option could be
> to define a cut-off interval T, i.e. set the exponentially vanishing weight
> of value of data points to be zero at some T0 < T1 - T. If the data points
> come at some discrete time-intervals (minutes, hours, days), then this
> could mean you can split the data into time sliding windows (window
> interval being the cut-off interval, and slide the update interval) and
> assign weight for each data point in the particular time interval - i.e.
> how much weight does the data point have at the time of end of the sliding
> window. With this you could then using CombineFn to count and sum the
> weighted averages, which would be much more efficient.
>
> Best,
>
>  Jan
> On 9/30/23 17:08, Balogh, György wrote:
>
> Hi,
> I want to calculate the exponential moving average of a signal using beam
> in java.
> I understand there is no time order guarantee on incoming data. What would
> be the simplest solution for this?
> Thank you,
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>


Re: [QUESTION] Why no auto labels?

2023-10-01 Thread Reuven Lax via user
Are you talking about transform names? The main reason was because for
runners that support updating pipelines in place, the only way to do so
safely is if the runner can perfectly identify which transforms in the new
graph match the ones in the old graph. There's no good way to auto generate
names that will stay stable across updates - even small changes to the
pipeline might change the order of nodes in the graph, which could result
in a corrupted update.

However, if you don't care about update, Beam can auto generate these names
for you! When you call PCollection.apply (if using BeamJava), simply omit
the name parameter and Beam will auto generate a unique name for you.

Reuven

On Sat, Sep 30, 2023 at 11:54 AM Joey Tran 
wrote:

> After writing a few pipelines now, I keep getting tripped up from
> accidentally have duplicate labels from using multiple of the same
> transforms without labeling them. I figure this must be a common complaint,
> so I was just curious, what the rationale behind this design was? My naive
> thought off the top of my head is that it'd be more user friendly to just
> auto increment duplicate transforms, but I figure I must be missing
> something
>
> Cheers,
> Joey
>


Re: @FieldAccess parameter types not being enforced vs corresponding schema types in Java DoFn

2023-09-18 Thread Reuven Lax via user
Good question - I know it will be enforced at runtime (I think you'll get a
ClassCastException if things don't match) but I'd have to check to see if
we enforce it at graph-submission time.

If we don't have this validation in place, adding it would be an
improvement.

On Mon, Sep 18, 2023 at 3:04 PM Varun Golani 
wrote:

> I've been looking at the following session from the 2019 Beam Summit
> covering Schema-aware PCollections (18:00 -
> https://www.youtube.com/watch?v=aRIZXtQiCHw). The specific part of the
> video mentions that for fields accessed via the @FieldAccess annotation,
> Beam checks for both the existence of the field AND matches the type during
> pipeline construction time.
>
> When trying this out on my application locally, I do see that Beam flags
> up incorrectly specified fields that don't exist in the PCollection's
> schema. However it seems as if it doesn't enforce/check the types of the
> parameter variable match the types specified in the Schema.
>
> For instance if I have a PCollection containing an attribute "field1" with
> type String, when referring to this attribute in my DoFn using 
> *@FieldAccess("field1")
> Double field1 *it does NOT flag this String/Double type mismatch during
> pipeline construction similar to how it would do if I refer to an
> attribute that doesn't exist in the schema.
>
> Is this expected behaviour? If so, would there be any way in which I could
> write a custom validator which strongly enforces @FieldAccess parameter
> types against the types in the corresponding schema?
>


Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Correct - I was referring to Java.

On Fri, Sep 15, 2023 at 9:55 AM Robert Bradshaw  wrote:

> On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user 
> wrote:
>
>> Creating composite DoFns is tricky today due to how they are implemented
>> (via annotated methods).
>>
>
> Note that this depends on the language. This should be really easy to do
> from Python.
>
>
>> However providing such a method to compose DoFns would be very useful IMO.
>>
>
> +1
>
>
>> On Fri, Sep 15, 2023 at 9:33 AM Joey Tran 
>> wrote:
>>
>>> Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
>>> sounds like the likely solution, was just hoping there'd be one that could
>>> wrap at the PTransform level but I realize now the PTransform abstraction
>>> is too general as you mentioned to do something like that.
>>>
>>> (2) will be likely what we do, though now I'm wondering if it might be
>>> possible to create a ParDo wrapper that can take a ParDo, extract it's
>>> dofn, wrap it, and return a new ParDo
>>>
>>> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> +1 to looking at composite transforms. You could even have a composite
>>>> transform that takes another transform as one of its construction arguments
>>>> and whose expand method does pre- and post-processing to the inputs/outputs
>>>> before/after applying the transform in question. (You could even implement
>>>> this as a Python decorator if you really wanted, either decorating the
>>>> expand method itself or the full class...)
>>>>
>>>> One of the difficulties is that for a general transform there isn't
>>>> necessarily a 1:N relationship between outputs and inputs as one has for a
>>>> DoFn (especially if there is any aggregation involved). There are, however,
>>>> two partial solutions that might help.
>>>>
>>>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
>>>> returns at most N elements. You could do this with a CombinePerKey if you
>>>> can come up with a reasonable key (e.g. the id of your input elements) that
>>>> the limit should be a applied to. Note that this may cause a lot of data to
>>>> be shuffled (though due to combiner lifting, no more than N per bundle).
>>>>
>>>> (2) You could have a DoFn that limits to N per bundle by initializing a
>>>> counter in its start_bundle and passing elements through until the counter
>>>> reaches a threshold. (Again, one could do this per id if one is available.)
>>>> It wouldn't stop production of the elements, but if things get fused it
>>>> would still likely be fairly cheap.
>>>>
>>>> Both of these could be prepended to the problematic consuming
>>>> PTransform as well.
>>>>
>>>> - Robert
>>>>
>>>>
>>>>
>>>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran 
>>>> wrote:
>>>>
>>>>> I'm aware of composite transforms and of the distributed nature of
>>>>> PTransforms. I'm not suggesting limiting the entire set and my example was
>>>>> more illustrative than the actual use case.
>>>>>
>>>>> My actual use case is basically: I have multiple PTransforms, and
>>>>> let's say most of them average ~100 generated outputs for a single input.
>>>>> Most of these PTransforms will occasionally run into an input though that
>>>>> might output maybe 1M outputs. This can cause issues if for example there
>>>>> are transforms that follow it that require a lot of compute per input.
>>>>>
>>>>> The simplest way to deal with this is to modify the `DoFn`s in our
>>>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>>>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>>>>> our transforms, but it'd be much cleaner if we could lift up this limiting
>>>>> logic out of the application logic and have some generic wrapper that
>>>>> extends our transforms.
>>>>>
>>>>> Thanks for the discussion!
>>>>>
>>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>>>> aromanenko@gmail.com> wrote:
>>>>>
>>>>>> I don’t think it’s possible to extend in a way that you are 

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Creating composite DoFns is tricky today due to how they are implemented
(via annotated methods). However providing such a method to compose DoFns
would be very useful IMO.

On Fri, Sep 15, 2023 at 9:33 AM Joey Tran  wrote:

> Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
> sounds like the likely solution, was just hoping there'd be one that could
> wrap at the PTransform level but I realize now the PTransform abstraction
> is too general as you mentioned to do something like that.
>
> (2) will be likely what we do, though now I'm wondering if it might be
> possible to create a ParDo wrapper that can take a ParDo, extract it's
> dofn, wrap it, and return a new ParDo
>
> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> +1 to looking at composite transforms. You could even have a composite
>> transform that takes another transform as one of its construction arguments
>> and whose expand method does pre- and post-processing to the inputs/outputs
>> before/after applying the transform in question. (You could even implement
>> this as a Python decorator if you really wanted, either decorating the
>> expand method itself or the full class...)
>>
>> One of the difficulties is that for a general transform there isn't
>> necessarily a 1:N relationship between outputs and inputs as one has for a
>> DoFn (especially if there is any aggregation involved). There are, however,
>> two partial solutions that might help.
>>
>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
>> returns at most N elements. You could do this with a CombinePerKey if you
>> can come up with a reasonable key (e.g. the id of your input elements) that
>> the limit should be a applied to. Note that this may cause a lot of data to
>> be shuffled (though due to combiner lifting, no more than N per bundle).
>>
>> (2) You could have a DoFn that limits to N per bundle by initializing a
>> counter in its start_bundle and passing elements through until the counter
>> reaches a threshold. (Again, one could do this per id if one is available.)
>> It wouldn't stop production of the elements, but if things get fused it
>> would still likely be fairly cheap.
>>
>> Both of these could be prepended to the problematic consuming PTransform
>> as well.
>>
>> - Robert
>>
>>
>>
>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran 
>> wrote:
>>
>>> I'm aware of composite transforms and of the distributed nature of
>>> PTransforms. I'm not suggesting limiting the entire set and my example was
>>> more illustrative than the actual use case.
>>>
>>> My actual use case is basically: I have multiple PTransforms, and let's
>>> say most of them average ~100 generated outputs for a single input. Most of
>>> these PTransforms will occasionally run into an input though that might
>>> output maybe 1M outputs. This can cause issues if for example there are
>>> transforms that follow it that require a lot of compute per input.
>>>
>>> The simplest way to deal with this is to modify the `DoFn`s in our
>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>>> our transforms, but it'd be much cleaner if we could lift up this limiting
>>> logic out of the application logic and have some generic wrapper that
>>> extends our transforms.
>>>
>>> Thanks for the discussion!
>>>
>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 I don’t think it’s possible to extend in a way that you are asking
 (like, Java classes “*extend*"). Though, you can create your own
 composite PTransform that will incorporate one or several others inside
 *“expand()”* method. Actually, most of the Beam native PTransforms are
 composite transforms. Please, take a look on doc and examples [1]

 Regarding your example, please, be aware that all PTransforms are
 supposed to be executed in distributed environment and the order of records
 is not guaranteed. So, limiting the whole output by fixed number of records
 can be challenging - you’d need to make sure that it will be processed on
 only one worker, that means that you’d need to shuffle all your records by
 the same key and probably sort the records in way that you need.

 Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for
 that? [2]

 If it doesn’t work for you, could you provide more details of your use
 case? Then we probably can propose the more suitable solutions for that.

 [1]
 https://beam.apache.org/documentation/programming-guide/#composite-transforms
 [2]
 https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html

 —
 Alexey

 On 15 Sep 2023, at 14:22, Joey Tran  wrote:

 Is there a way to extend already defined PTransforms? My question is
 probably 

Re: How can we get multiple side inputs from a single pipeline ?

2023-08-28 Thread Reuven Lax via user
This looks fine. One caveat: there currently appears to be a bug in Beam
when you apply a combiner followed by View.asSingleton. I would
recommend replacing these lines:

.apply(Latest.globally())
.apply(View.asSingleton())

With the following:
.apply(Reify.timestamps())
.apply(Combine.globally(Latest.combineFn()).asSingletonView())

On Mon, Aug 28, 2023 at 8:30 AM Sachin Mittal  wrote:

> Hi,
>
> I was checking the code for side input patterns :
>
> https://beam.apache.org/documentation/patterns/side-inputs/
> Basically I need multiple side inputs from a  Slowly updating global
> window side inputs.
>
> So as per example pipeline is something like this:
>
> PCollectionView map =
> p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))   
>  .apply(ParDo.of(new DoFn Map>() {  @ProcessElement 
>  public void process(@Element Long input, @Timestamp Instant timestamp, 
> OutputReceiver> o) {o.output(/* 
> output a map */);// also output another map and a 
> list, is this possible ?  }}))
> .apply(Window.>into(new 
> GlobalWindows())
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>.discardingFiredPanes())
> .apply(Latest.globally()).apply(View.asSingleton());
>
>
> So as an extension of this example from the same DoFn which fetches the
> side input, alongside the map, I may also need another Map and another List.
> Reason I need to perform this in the same DoFn is that from this function
> we query external sources to get the side input and the other side inputs
> are also built from the same source.
>
> So I would like to avoid querying external sources multiple times to
> generate multiple side inputs from different DoFn and want to use the same
> function to generate multiple side inputs.
>
>  Can I achieve this by using  "Tags for multiple outputs" ?
>
> Thanks
> Sachin
>
>
>
>
>
>


Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
Oh - in that case it's possible that the problem may be the direct runner's
implementation of the pubsub source - especially the watermark. For a
direct-runner test, I recommend using TestStream (which allows you to
advance the watermark manually, so you can test windowing).

On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar  wrote:

> I'm developing with direct runner. but should go to dataflow when
> deployed.
>
>
>  Original Message 
> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
>
>
> What runner are you using to run this pipeline?
>
> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar 
> wrote:
>
>> Same result:
>> PCollection result = p
>> .apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>> .apply("Transform", ParDo.of(new MyTransformer()))
>> .apply("Windowing",
>> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>> .triggering(AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30
>> .withAllowedLateness(Duration.standardMinutes(1))
>> .discardingFiredPanes());
>>
>> PCollection insert = result.apply("Inserting",
>> JdbcIO.write()
>>
>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> .withStatement("INSERT INTO person (first_name,
>> last_name) VALUES (?, 'doe')")
>> .withPreparedStatementSetter((element,
>> preparedStatement) -> {
>> log.info("Preparing statement to insert");
>> preparedStatement.setString(1, element);
>> })
>> .withResults()
>> );
>> result.apply(Wait.on(insert))
>>         .apply("Selecting", new SomeTransform())
>> .apply("PubsubMessaging", ParDo.of(new
>> NextTransformer()));
>> p.run();
>>
>> updated the github repo as wqell.
>>
>> --- Original Message ---
>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>
>> > The other problem you have here is that you have not set a window.
>> Wait.on waits for the end of the current window before triggering. The
>> default Window is the GlobalWindow, so as written Wait.on will wait for the
>> end of time (or until you drain the pipeline, which will also trigger the
>> GlobalWindow).
>> > Try adding a 1-minute fixed window to the results you read from PubSub.
>> >
>> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar 
>> wrote:
>> >
>> > > writeVoid() and write() plus withResults() return the same
>> PCollection AFAIK. In any case i updated the code and same thing
>> happens
>> > >
>> > > PCollection result = p.
>> > > apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>> > > .apply("Transform", ParDo.of(new MyTransformer()));
>> > >
>> > > PCollection insert = result.apply("Inserting",
>> > > JdbcIO.write()
>> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
>> 'doe')")
>> > > .withPreparedStatementSetter((element, preparedStatement) -> {
>> > > log.info("Preparing statement to insert");
>> > > preparedStatement.setString(1, element);
>> > > })
>> > > .withResults()
>> > > );
>> > > result.apply(Wait.on(insert))
>> > > .apply("Selecting", new SomeTransform())
>> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> > >
>> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>> > >
>> > > --- Original Message ---
>> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>> > >
>> > >

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
What runner are you using to run this pipeline?

On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar  wrote:

> Same result:
> PCollection result = p
> .apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> .apply("Transform", ParDo.of(new MyTransformer()))
> .apply("Windowing",
> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
> .triggering(AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30
> .withAllowedLateness(Duration.standardMinutes(1))
> .discardingFiredPanes());
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
> .withPreparedStatementSetter((element,
> preparedStatement) -> {
> log.info("Preparing statement to insert");
> preparedStatement.setString(1, element);
> })
> .withResults()
> );
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> p.run();
>
> updated the github repo as wqell.
>
> --- Original Message ---
> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > The other problem you have here is that you have not set a window.
> Wait.on waits for the end of the current window before triggering. The
> default Window is the GlobalWindow, so as written Wait.on will wait for the
> end of time (or until you drain the pipeline, which will also trigger the
> GlobalWindow).
> > Try adding a 1-minute fixed window to the results you read from PubSub.
> >
> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar 
> wrote:
> >
> > > writeVoid() and write() plus withResults() return the same
> PCollection AFAIK. In any case i updated the code and same thing
> happens
> > >
> > > PCollection result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection insert = result.apply("Inserting",
> > > JdbcIO.write()
> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
> 'doe')")
> > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > log.info("Preparing statement to insert");
> > > preparedStatement.setString(1, element);
> > > })
> > > .withResults()
> > > );
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > >
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > >
> > > --- Original Message ---
> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
> > >
> > >
> > > > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> > > >
> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar 
> wrote:
> > > >
> > > > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > > > >
> > > > > PCollection result = p.
> > > > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > >
> > > > > PCollection insert = result.apply("Inserting",
> > > > > J

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
The other problem you have here is that you have not set a window. Wait.on
waits for the end of the current window before triggering. The default
Window is the GlobalWindow, so as written Wait.on will wait for the end of
time (or until you drain the pipeline, which will also trigger the
GlobalWindow).

Try adding a 1-minute fixed window to the results you read from PubSub.

On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar  wrote:

> writeVoid() and write() plus withResults() return the same
> PCollection AFAIK. In any case i updated the code and same thing
> happens
>
>  PCollection result = p.
> apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> .apply("Transform", ParDo.of(new MyTransformer()));
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
> .withPreparedStatementSetter((element,
> preparedStatement) -> {
> log.info("Preparing statement to insert");
> preparedStatement.setString(1, element);
> })
> .withResults()
> );
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>
> --- Original Message ---
> On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> >
> > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar 
> wrote:
> >
> > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > >
> > > PCollection result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection insert = result.apply("Inserting",
> > > JdbcIO.writeVoid()
> > > .withDataSourceProviderFn(/*...*/)
> > > .withStatement(/*...*/)
> > > .withPreparedStatementSetter(/*...*/)
> > > );
> > >
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > p.run();
> > >
> > > In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
> > >
> > > I've tried adding logging messages in my transforms to track the
> progress and identify where it's getting stuck, but I haven't been able to
> pinpoint the issue. I've searched for solutions online, but none of them
> provided a successful resolution for my problem.
> > >
> > > Can anyone provide any insights or suggestions on how to debug and
> resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > >
> > > You can find the sample code at: https://github.com/j1cs/app-beam
> > >
> > > Thank you for your help and support.
> > >
> > > Best regards,
> > >
> > > Juan Cuzmar.
>


Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
I believe you have to call withResults() on the JdbcIO transform in order
for this to work.

On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar  wrote:

> I hope you all are doing well. I am facing an issue with an Apache Beam
> pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
>
> PCollection result = p.
> apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> .apply("Transform", ParDo.of(new MyTransformer()));
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.writeVoid()
> .withDataSourceProviderFn(/*...*/)
> .withStatement(/*...*/)
> .withPreparedStatementSetter(/*...*/)
> );
>
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> p.run();
>
> In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
>
> I've tried adding logging messages in my transforms to track the progress
> and identify where it's getting stuck, but I haven't been able to pinpoint
> the issue. I've searched for solutions online, but none of them provided a
> successful resolution for my problem.
>
> Can anyone provide any insights or suggestions on how to debug and resolve
> this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
>
> You can find the sample code at: https://github.com/j1cs/app-beam
>
> Thank you for your help and support.
>
> Best regards,
>
> Juan Cuzmar.
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-18 Thread Reuven Lax via user
Jeff - does setting the global default work for you, or do you need
per-operator control? Seems like it would be to add this to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
wrote:

> Yeah, I don't think we have a good per-operator API for this. If we were
> to add it, it probably belongs in ResourceHints.
>
> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:
>
>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>> set. I believe this sets the default parallelism for all Flink operators.
>>
>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>>
>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>
>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>>> wrote:
>>>
>>>> To a (small) degree Sparks “new” AQE might be able to help depending on
>>>> what kind of operations Beam is compiling it down to.
>>>>
>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>
>>>>
>>>>
>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> I see. Robert - what is the story for parallelism controls on GBK with
>>>>> the Spark or Flink runners?
>>>>>
>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:
>>>>>
>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>> operators.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>>> level. And the input size of the operator is unknown at compiling 
>>>>>>>>>> stage if
>>>>>>>>>> it is not a source
>>>>>>>>>>  operator,
>>>>>>>>>>
>>>>>>>>>> Here's an example of flink
>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>> and reduceByKey):
>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of 
>>>>>>>>>>> keys in
>>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>>
>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>> dynamically
>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>> parallelism of
>>>>>>>>>>> the execution.
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>> execution
>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and 
>>>>>>>>>>>> join in
>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>


Re: Loosing records when using BigQuery IO Connector

2023-04-17 Thread Reuven Lax via user
What version of Beam are you using? There are no known data-loss bugs in
the connector, however if there has been a regression we would like to
address it with high priority.

On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van  wrote:

> Hi,
>
> I have a job that uses BigQuery IO Connector to write to a BigQuery table.
> When I test it with a small number of records (100) it works as expected
> but when I tested it with a larger number of records (1), I don’t see
> all of the records written to the output table but only part of it. It
> changes from run to run but no more than 1000 records as I have seen so far.
>
> There are WARNING log entries in the job log like this
>
> by client #0 failed with error, operations will be retried.
> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: 
> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>
> And one log entry like this
>
> Finalize of stream [stream-name] finished with row_count: 683
>
> If I sum all the numbers reported in the WARNING message with the one in
> the finalize of stream above, I get 1, which is exactly the number of
> input records.
>
> My pipeline uses 1 worker and it looks like this
>
> WriteResult writeResult = inputCollection.apply(
> "Save Events To BigQuery",
> BigQueryIO.write()
> .to(options.getTable())
> .withFormatFunction(TableRowMappers::toRow)
> .withMethod(Method.STORAGE_WRITE_API)
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> .withExtendedErrorInfo());
>
> writeResult
> .getFailedStorageApiInserts()
> .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>
> There are no log entries for the failed inserts.
>
> Is there anything wrong with my pipeline code or is it a bug in BigQuery
> IO Connector?
>
> Thanks
> -Binh
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-17 Thread Reuven Lax via user
Looking at FlinkPipelineOptions, there is a parallelism option you can set.
I believe this sets the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:

> Thanks Holden, this would work for Spark, but Flink doesn't have such kind
> of mechanism, so I am looking for a general solution on the beam side.
>
> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
> wrote:
>
>> To a (small) degree Sparks “new” AQE might be able to help depending on
>> what kind of operations Beam is compiling it down to.
>>
>> Have you tried setting spark.sql.adaptive.enabled &
>> spark.sql.adaptive.coalescePartitions.enabled
>>
>>
>>
>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>> I see. Robert - what is the story for parallelism controls on GBK with
>>> the Spark or Flink runners?
>>>
>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:
>>>
>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>
>>>>
>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>>>>
>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>>>>> and Flink - dynamically modifies the parallelism as the operator runs, so
>>>>> there is no need to have such controls. In fact these specific controls
>>>>> wouldn't make much sense for the way Dataflow implements these operators.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:
>>>>>
>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:
>>>>>>>
>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>> level. And the input size of the operator is unknown at compiling 
>>>>>>>> stage if
>>>>>>>> it is not a source
>>>>>>>>  operator,
>>>>>>>>
>>>>>>>> Here's an example of flink
>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>> and reduceByKey):
>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>>>>> your data. If you do a GroupByKey for example, the number of keys in 
>>>>>>>>> your
>>>>>>>>> data determines the maximum parallelism.
>>>>>>>>>
>>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to 
>>>>>>>>> automatically
>>>>>>>>> determine the parallelism (e.g. work will be dynamically split and 
>>>>>>>>> moved
>>>>>>>>> around between workers, the number of workers will autoscale, etc.), 
>>>>>>>>> so
>>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>> execution
>>>>>>>>>> engine, but it is very common to set parallelism like group by and 
>>>>>>>>>> join in
>>>>>>>>>> both spark & flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
I see. Robert - what is the story for parallelism controls on GBK with the
Spark or Flink runners?

On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:

> No, I don't use dataflow, I use Spark & Flink.
>
>
> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>
>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>> and Flink - dynamically modifies the parallelism as the operator runs, so
>> there is no need to have such controls. In fact these specific controls
>> wouldn't make much sense for the way Dataflow implements these operators.
>>
>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:
>>
>>> Just for performance tuning like in Spark and Flink.
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> What are you trying to achieve by setting the parallelism?
>>>>
>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:
>>>>
>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>> it is not a source
>>>>>  operator,
>>>>>
>>>>> Here's an example of flink
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>> and reduceByKey):
>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>
>>>>>
>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>>> data determines the maximum parallelism.
>>>>>>
>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>
>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:
>>>>>>
>>>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>>>> parallelism for individual operators like group by and join? I
>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>> engine, but it is very common to set parallelism like group by and join 
>>>>>>> in
>>>>>>> both spark & flink.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
Are you running on the Dataflow runner? If so, Dataflow - unlike Spark and
Flink - dynamically modifies the parallelism as the operator runs, so there
is no need to have such controls. In fact these specific controls wouldn't
make much sense for the way Dataflow implements these operators.

On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:

> Just for performance tuning like in Spark and Flink.
>
>
> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> What are you trying to achieve by setting the parallelism?
>>
>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:
>>
>>> Thanks Reuven, what I mean is to set the parallelism in operator level.
>>> And the input size of the operator is unknown at compiling stage if it is
>>> not a source
>>>  operator,
>>>
>>> Here's an example of flink
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>> Spark also support to set operator level parallelism (see groupByKey
>>> and reduceByKey):
>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> The maximum parallelism is always determined by the parallelism of your
>>>> data. If you do a GroupByKey for example, the number of keys in your data
>>>> determines the maximum parallelism.
>>>>
>>>> Beyond the limitations in your data, it depends on your execution
>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>> around between workers, the number of workers will autoscale, etc.), so
>>>> there's no need to explicitly set the parallelism of the execution.
>>>>
>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:
>>>>
>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>> parallelism for individual operators like group by and join? I
>>>>> understand the parallelism setting depends on the underlying execution
>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>> both spark & flink.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-15 Thread Reuven Lax via user
The maximum parallelism is always determined by the parallelism of your
data. If you do a GroupByKey for example, the number of keys in your data
determines the maximum parallelism.

Beyond the limitations in your data, it depends on your execution engine.
If you're using Dataflow, Dataflow is designed to automatically determine
the parallelism (e.g. work will be dynamically split and moved around
between workers, the number of workers will autoscale, etc.), so there's no
need to explicitly set the parallelism of the execution.

On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:

> Besides the global parallelism of beam job, is there any way to set
> parallelism for individual operators like group by and join? I
> understand the parallelism setting depends on the underlying execution
> engine, but it is very common to set parallelism like group by and join in
> both spark & flink.
>
>
>
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: major reduction is performance when using schema registry - KafkaIO

2023-04-09 Thread Reuven Lax via user
How are you using the schema registry? Do you have a code sample?

On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov  wrote:

> Hello,
>
> I am trying to understand the effect of schema registry on our pipeline's
> performance. In order to do sowe created a very simple pipeline that reads
> from kafka, runs a simple transformation of adding new field and writes of
> kafka.  the messages are in avro format
>
> I ran this pipeline with 3 different options on same configuration : 1
> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>
> * when i used apicurio as the schema registry i was able to process only
> 2000 messages per second
> * when i used confluent schema registry i was able to process 7000
> messages per second
> * when I did not use any schema registry and used plain avro
> deserializer/serializer i was able to process *30K* messages per second.
>
> I understand that using a schema registry may cause a reduction in
> performance but  in my opinion the difference is too high.
> Any comments or suggestions about these results?
>
> Thanks in advance
> Sigalit
>


Re: Successful Inserts for Storage Write API?

2023-03-21 Thread Reuven Lax via user
FYI, I just checked in WriteResult.getSuccessfulStorageApiInserts, which
should give you what you need. This is now checked into Beam HEAD, and
should be included in the next Beam release.

On Fri, Mar 3, 2023 at 12:51 PM Matthew Ouyang 
wrote:

> I'm currently not using Wait.on.  I have a pipeline that uses BigQuery for
> checkpointing purposes.  I only want records that are
> successfully checkpointed in BigQuery to be eligible for the next stage in
> my pipeline.  With streaming inserts, I can use getSuccessfulInserts to
> achieve this and I was looking for something similar with Storage Write.
>
> On Thu, Mar 2, 2023 at 4:48 PM Reuven Lax via user 
> wrote:
>
>> Are you trying to do this in order to use Wait.on? getSuccessfulInserts
>> is not currently supported for Storage Write API.
>>
>> On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang 
>> wrote:
>>
>>> Thank you to Ahmed and Reuven for the tip on
>>> WriteResult::getFailedStorageApiInserts.
>>>
>>> When I tried to get the successful inserts through the Storage Write
>>> API, I received an error message saying that "Retrieving successful inserts
>>> is only supported for streaming inserts. Make sure
>>> withSuccessfulInsertsPropagation is correctly configured for
>>> BigQueryIO.Write object."  Did I make a mistake, or is there a reason why
>>> this is not possible?  I tried setting triggeringFrequency +
>>> numStorageWriteApiStreams as required by Storage Write, and I tried to set
>>> successfulInsertsPropagation as directed in the error message.
>>>
>>


Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Reuven Lax via user
In Apache Beam, Flatten is a union operation - it takes multiple
PCollections (of the same type) and merges them into a single PCollection.

On Mon, Mar 13, 2023 at 11:32 AM Godefroy Clair 
wrote:

> Hi,
> I am wondering about the way `Flatten()` and `FlatMap()` are implemented
> in Apache Beam Python.
> In most functional languages, FlatMap() is the same as composing
> `Flatten()` and `Map()` as indicated by the name, so Flatten() and
> Flatmap() have the same input.
> But in Apache Beam, Flatten() is using _iterable of PCollections_ while
> FlatMap() is working with _PCollection of Iterables_.
>
> If I am not wrong, the signature of Flatten, Map and FlatMap are :
> ```
> Flatten:: Iterable[PCollections[A]] -> PCollection[A]
> Map:: (PCollection[A], (A-> B)) -> PCollection[B]
> FlatMap:: (PCollection[Iterable[A]], (A->B)) -> [A]
> ```
>
> So my question is is there another "Flatten-like" function  with this
> signature :
> ```
> anotherFlatten:: PCollection[Iterable[A]] -> PCollection[A]
> ```
>
> One of the reason this would be useful, is that when you just want to
> "flatten" a `PCollection` of `iterable` you have to use `FlatMap()`with an
> identity function.
>
> So instead of writing:
> `FlatMap(lambda e: e)`
> I would like to use a function
> `anotherFlatten()`
>
> Thanks,
> Godefroy
>


Re: Successful Inserts for Storage Write API?

2023-03-02 Thread Reuven Lax via user
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is
not currently supported for Storage Write API.

On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang 
wrote:

> Thank you to Ahmed and Reuven for the tip on
> WriteResult::getFailedStorageApiInserts.
>
> When I tried to get the successful inserts through the Storage Write API,
> I received an error message saying that "Retrieving successful inserts is
> only supported for streaming inserts. Make sure
> withSuccessfulInsertsPropagation is correctly configured for
> BigQueryIO.Write object."  Did I make a mistake, or is there a reason why
> this is not possible?  I tried setting triggeringFrequency +
> numStorageWriteApiStreams as required by Storage Write, and I tried to set
> successfulInsertsPropagation as directed in the error message.
>


Re: Deduplicate usage

2023-03-02 Thread Reuven Lax via user
State is per-key, and keys are distributed across workers. Two workers
should not be working on the same state.

On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van  wrote:

> Thank you Ankur,
>
> This is the current source code of Deduplicate transform.
>
>   Boolean seen = seenState.read();
>   // Seen state is either set or not set so if it has been set then it 
> must be true.
>   if (seen == null) {
> // We don't want the expiry timer to hold up watermarks.
> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
> seenState.write(true);
> receiver.output(element);
>   }
>
> Could you please explain the synchronization for the following scenario?
>
>- There are two workers.
>- Both workers read the same state at the same time and the state was
>not set yet. In this case, both will get null in the response (I
>believe)
>- Both of them will try to set the state and send the output out.
>
> What will happen in this scenario?
>
> Thank you
> -Binh
>
> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka 
> wrote:
>
>> Hi Binh, The Deduplicate transform uses state api to do the
>> de-duplication which should do the needful operations to work across
>> multiple concurrent workers.
>>
>> Thanks,
>> Ankur
>>
>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>>
>>> Hi,
>>>
>>> I am writing a pipeline and want to apply deduplication. I look at
>>> Deduplicate transform that Beam provides and wonder about its usage. Do
>>> I need to shuffle input collection by key before calling this
>>> transformation? I look at its source code and it doesn’t do any shuffle so
>>> wonder how it works when let’s say there are duplicates and the duplicated
>>> elements are processed concurrently on multiple workers.
>>>
>>> Thank you
>>> -Binh
>>>
>>


Re: getFailedInsertsWithErr and Storage Write API

2023-03-01 Thread Reuven Lax via user
Correct, however if you are using a recent version of Beam you can call
WriteResult.getFailedStorageApiInserts

On Wed, Mar 1, 2023 at 3:00 PM Matthew Ouyang 
wrote:

> The documentation says WriteResult.getFailedInserts won’t return anything
> when used with the Storage Write API (
> https://beam.apache.org/documentation/io/built-in/google-bigquery/) Is it
> the same for WriteResult.getFailedInsertsWithErr?
>


Re: Beam saves filepaths in Flink's state

2022-12-08 Thread Reuven Lax via user
This doesn't sound ideal to me. For contrast, Dataflow doesn't save any of
these things (coders, transforms, configs) in state, which makes it easier
for Dataflow to update pipelines.

On Thu, Dec 8, 2022 at 7:48 AM Cristian Constantinescu 
wrote:

> Hi everyone,
>
> I noticed that the Flink state contains KafkaIO's consumer config
> properties.
>
> When restarting the Beam pipeline (Java SDK) from state, the Flink
> Runner translation layer will deserialize the KafkaUnboudedReader (via
> UnboundedSourceWrapper) from Flink's state. This happens *before* the
> user written KafkaIO builder code is executed. Effectively what this
> means is that if the user has code that feeds KafkaIO correct file
> paths (probably fetched from configs), Beam still tries to use the
> ones that were saved in the Flink state and those may be outdated,
> hence preventing the pipeline from starting up properly.
>
> This is problematic if files get moved around on disk, or if we move
> the Flink state to another machine that may have different file
> configurations.
>
> Has anyone seen this problem before?
>
> Also, could anyone give me a quick overview of why Beam saves so many
> things in the Flink state (I'm aware of coders, transforms and
> transform configs) when those things can be materialized from the user
> code just like when the pipeline is started without a state. It would
> help me find a workaround for this issue.
>
> Thanks,
> Cristian
>


Re: Single side input to multiple transforms

2022-11-07 Thread Reuven Lax via user
Is this a Python job?

On Mon, Nov 7, 2022 at 12:38 AM Binh Nguyen Van  wrote:

> Hi,
>
> I am writing a pipeline where I have one singleton side input that I want
> to use in multiple different transforms. When I run the pipeline in Google
> Dataflow I see multiple entries in the logs that have a message like this
>
> Deduplicating side input tags, found non-unique side input key
> org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.:1204#4663620f501c9270
>
> Is this something that I should avoid? If so how can I do that?
>
> Thanks
> -Binh
>


Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Reuven Lax via user
PCollections's usually are persistent within a pipeline, so you can reuse
them in other parts of a pipeline with no problem.

There is no notion of state across pipelines - every pipeline is
independent. If you want state across pipelines you can write the
PCollection out to a set of files which are read back in in the new
pipeline.

On Tue, Oct 18, 2022 at 11:45 PM Ravi Kapoor  wrote:

> Hi Team,
> Can we stage a PCollection or  PCollection data? Lets say
> to save  the expensive operations between two complex BQ tables time and
> again and materialize it in some temp view which will be deleted after the
> session.
>
> Is it possible to do that in the Beam Pipeline?
> We can later use the temp view in another pipeline to read the data from
> and do processing.
>
> Or In general I would like to know Do we ever stage the PCollection.
> Let's say I want to create another instance of the same job which has
> complex processing.
> Does the pipeline re perform the computation or would it pick the already
> processed data in the previous instance that must be staged somewhere?
>
> Like in spark we do have notions of createOrReplaceTempView which is used
> to create temp table from a spark dataframe or dataset.
>
> Please advise.
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorrav...@gmail.com
>


Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
The default max file size is 4Tib. BigQuery supports files up to 5Tib, but
there might be some slop in our file-size estimation which is why Beam set
a slightly lower limit. In any case, you won't be able to increase that
value by too much, or BigQuery will reject the load job.

The default max bytes per partition maybe can be increased. When the code
was written, BigQuery's max limit was 12 Tib, but if it's now 15 TiB that
would be a reason to increase it.

BigQuery does not provide guarantees on scheduling load jobs (especially if
you don't have reserved slots). Some other ideas for how to improve things:
- If you are running in streaming mode, then consider increasing the
triggering duration so you generate load jobs less often.
- By default, files are written out in json format. This is inefficient
and tends to create many more files. There is currently partial support for
writing files in a more-efficient AVRO format, but it requires you to call
withAvroWriter to pass in a function that converts your records into AVRO.
- I would also recommend trying the storage API write method. This does
not have the same issues with scheduling that load jobs have.

Reuven

On Thu, Sep 29, 2022 at 1:02 PM Julien Phalip  wrote:

> Hi all,
>
> Thanks for the replies.
>
> @Ahmed, you mentioned that one could hardcode another value
> for DEFAULT_MAX_FILE_SIZE. How may I do that from my own code?
>
> @Reuven, to give you more context on my use case: I'm running into an
> issue where a job that writes to BQ is taking an unexpectedly long time. It
> looks like things are slowing down on the BQ load job side of things. My
> theory is that the pipeline might generate too many BQ load job requests
> for BQ to handle in a timely manner. So I was thinking that this could be
> mitigated by increasing the file size, and therefore reducing the number of
> load job requests.
>
> That said, now that you've pointed at withMaxBytesPerPartition(), maybe
> that's what I should use instead? I see this defaults to 11TiB but perhaps
> I could try increasing it  to something closer to BQ's limit (15TiB)?
>
> Thanks,
>
> Julien
>
> On Thu, Sep 29, 2022 at 11:01 AM Ahmed Abualsaud via user <
> user@beam.apache.org> wrote:
>
>> That's right, if maxFileSize is made too small you may hit the default
>> maximum files per partition (10,000), in which case copy jobs will be
>> triggered. With that said though, BigQueryIO already has a public
>> withMaxBytesPerPartition() [1] method that controls the partition byte
>> size, which is arguably more influential in triggering this other codepath.
>>
>> [1]
>> https://github.com/apache/beam/blob/028c564b8ae1ba1ffa6aadb8212ec03555dd63b6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2623
>>
>> On Thu, Sep 29, 2022 at 12:24 PM Reuven Lax  wrote:
>>
>>> It's not public because it was added for use in unit tests, and
>>> modifying this value can have very unexpected results (e.g. making it
>>> smaller can trigger a completely different codepath that is triggered when
>>> there are too many files, leading to unexpected cost increases in the
>>> pipeline).
>>>
>>> Out of curiosity, what is your use case for needing to control this file
>>> size?
>>>
>>> On Thu, Sep 29, 2022 at 8:01 AM Ahmed Abualsaud <
>>> ahmedabuals...@google.com> wrote:
>>>
 Hey Julien,

 I don't see a problem with exposing that method. That part of the code
 was committed ~6 years ago, my guess is it wasn't requested to be public.

 One workaround is to hardcode another value for DEFAULT_MAX_FILE_SIZE [1].
 Would this work temporarily? @Chamikara Jayalath 
  @Reuven Lax  other thoughts?

 [1]
 https://github.com/apache/beam/blob/17453e71a81ba774ab451ad141fc8c21ea8770c9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L109

 Best,
 Ahmed

 On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip 
 wrote:

> Hi,
>
> I'd like to control the size of files written to GCS when using
> BigQueryIO's FILE_LOAD write method.
>
> However, it looks like the withMaxFileSize method (
> https://github.com/apache/beam/blob/948af30a5b665fe74b7052b673e95ff5f5fc426a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2597)
> is not public.
>
> Is that intentional? Is there a workaround to control the file size?
>
> Thanks,
>
> Julien
>

 On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip 
 wrote:

> Hi,
>
> I'd like to control the size of files written to GCS when using
> BigQueryIO's FILE_LOAD write method.
>
> However, it looks like the withMaxFileSize method (
> 

Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
It's not public because it was added for use in unit tests, and modifying
this value can have very unexpected results (e.g. making it smaller can
trigger a completely different codepath that is triggered when there are
too many files, leading to unexpected cost increases in the pipeline).

Out of curiosity, what is your use case for needing to control this file
size?

On Thu, Sep 29, 2022 at 8:01 AM Ahmed Abualsaud 
wrote:

> Hey Julien,
>
> I don't see a problem with exposing that method. That part of the code was
> committed ~6 years ago, my guess is it wasn't requested to be public.
>
> One workaround is to hardcode another value for DEFAULT_MAX_FILE_SIZE [1].
> Would this work temporarily? @Chamikara Jayalath  
> @Reuven
> Lax  other thoughts?
>
> [1]
> https://github.com/apache/beam/blob/17453e71a81ba774ab451ad141fc8c21ea8770c9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L109
>
> Best,
> Ahmed
>
> On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip  wrote:
>
>> Hi,
>>
>> I'd like to control the size of files written to GCS when using
>> BigQueryIO's FILE_LOAD write method.
>>
>> However, it looks like the withMaxFileSize method (
>> https://github.com/apache/beam/blob/948af30a5b665fe74b7052b673e95ff5f5fc426a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2597)
>> is not public.
>>
>> Is that intentional? Is there a workaround to control the file size?
>>
>> Thanks,
>>
>> Julien
>>
>
> On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip  wrote:
>
>> Hi,
>>
>> I'd like to control the size of files written to GCS when using
>> BigQueryIO's FILE_LOAD write method.
>>
>> However, it looks like the withMaxFileSize method (
>> https://github.com/apache/beam/blob/948af30a5b665fe74b7052b673e95ff5f5fc426a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2597)
>> is not public.
>>
>> Is that intentional? Is there a workaround to control the file size?
>>
>> Thanks,
>>
>> Julien
>>
>


Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
If you are using an exactly-once runner, it will guarantee every message is
consumed once (though the mechanism might not be obvious).

Generally what happens is that the messages are consumed into the system in
order. However if you have downstream ParDos, there is no guarantee that
they process the messages in the same order (especially if there is a
shuffle operation, such as GroupByKey, in between).

Now a future version of the source might decide to split the Kafka
partition if it's too large to handle on one thread (e.g. split it in half
where the first half is bounded and the second half is the growing
unbounded tail of the partition). In this case the source would keep two
checkpoints for the current position in each half of the partition. (this
mode of operation probably wouldn't be compatible with checkpointing
offsets back to the broker though.). The source doesn't do this today, I'm
just mentioning it to point out another way in which things could be
consumed out of order.

On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva 
wrote:

> Hi Reuven,
> Thanks for those clarifications.
>
> For the 4th question that I raised, if A gets failed and B is committed,
> will those messages(A) get consumed again from Kafka or will the messages
> get recovered from the checkpoint and retried in that specific operator?
>
> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user 
> wrote:
>
>>
>>
>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have started using KafkaIO to read a data stream and have the
>>> following questions. Appreciate it if you could provide a few
>>> clarifications on the following.
>>>
>>>
>> 1. Does KafkaIO ignore the offset stored in the broker and uses the
>>> offset stored during checkpointing when consuming messages?
>>>
>>
>> Generally yes, as that's the only way to guarantee consistency (we can't
>> atomically commit to the runner and to Kafka). However when starting a new
>> pipeline, you should be able to start reading at the broker checkpoint.
>>
>>
>>> 2. How many threads will be used by the Kafka consumer?
>>>
>>
>> This depends somewhat on the runner, but you can expect one thread per
>> partition.
>>
>>
>>> 3. If the consumer polls a set of messages A and then later B while A is
>>> still being processed, is there a possibility of set B finishing before A?
>>> Does parallelism control this?
>>>
>>
>> yes. Beam doesn't currently have any notion of ordering. All messages are
>> independent and can be processed at different times (the source also
>> reserves the right to process different ranges of a single Kafka partition
>> on different threads, though it doesn't currently do this).
>>
>>
>>> 4. In the above scenario if B is committed back to the broker and
>>> somehow A failed, upon a restart is there any way we can consume A again
>>> without losing data?
>>>
>>
>> Data should never be lost. If B is processed, then you can assume that
>> the A data is checkpointed inside the Beam runner and will be processed to.
>>
>>
>>
>>>
>>> Thank you.
>>>
>>>
>>>
>>


Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva 
wrote:

> Hi all,
>
> I have started using KafkaIO to read a data stream and have the following
> questions. Appreciate it if you could provide a few clarifications on the
> following.
>
>
1. Does KafkaIO ignore the offset stored in the broker and uses the offset
> stored during checkpointing when consuming messages?
>

Generally yes, as that's the only way to guarantee consistency (we can't
atomically commit to the runner and to Kafka). However when starting a new
pipeline, you should be able to start reading at the broker checkpoint.


> 2. How many threads will be used by the Kafka consumer?
>

This depends somewhat on the runner, but you can expect one thread per
partition.


> 3. If the consumer polls a set of messages A and then later B while A is
> still being processed, is there a possibility of set B finishing before A?
> Does parallelism control this?
>

yes. Beam doesn't currently have any notion of ordering. All messages are
independent and can be processed at different times (the source also
reserves the right to process different ranges of a single Kafka partition
on different threads, though it doesn't currently do this).


> 4. In the above scenario if B is committed back to the broker and somehow
> A failed, upon a restart is there any way we can consume A again without
> losing data?
>

Data should never be lost. If B is processed, then you can assume that the
A data is checkpointed inside the Beam runner and will be processed to.



>
> Thank you.
>
>
>


Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-30 Thread Reuven Lax via user
Snapshots are expected to happen nearly instantaneously. While processing
is paused while the snapshot is in progress, the pause should usually be
very brief. It's true that Dataflow does not support automated snapshots -
you would have to create them yourself using a cron.

Checkpoints on Flink aren't simply automated snapshot mechanism.
Checkpoints are how Flink implements consistent, exactly-once processing.
Dataflow on the other hand continuously checkpoints records, so doesn't
need global checkpoints for exactly-once processing.

Reuven

On Tue, Aug 30, 2022 at 5:10 AM Will Baker  wrote:

> I looked into snapshots and they do seem useful for providing a means
> to save state and resume, however they aren't as seamless as I was
> hoping for with the automatic checkpointing that is supported by other
> runners. It looked like snapshots would be user initiated and would
> pause the pipeline while the snapshot was being created. I could
> imagine how this would be set up on an automated schedule, but would
> still prefer something more light-weight like checkpoints.
>
> On Mon, Aug 29, 2022 at 8:11 PM Reuven Lax  wrote:
> >
> > Google Cloud Dataflow does support snapshots. Is this what you were
> looking for?
> >
> > On Mon, Aug 29, 2022 at 4:04 PM Kenneth Knowles  wrote:
> >>
> >> Hi Will, David,
> >>
> >> I think you'll find the best source of answer for this sort of question
> on the user@beam list. I've put that in the To: line with a BCC: to the
> dev@beam list so everyone knows they can find the thread there. If I have
> misunderstood, and your question has to do with building Beam itself, feel
> free to move it back.
> >>
> >> Kenn
> >>
> >> On Mon, Aug 29, 2022 at 2:24 PM Will Baker  wrote:
> >>>
> >>> Hello!
> >>>
> >>> I am wondering about using checkpoints with Beam running on Google
> >>> Cloud Dataflow.
> >>>
> >>> The docs indicate that checkpoints are not supported by Google Cloud
> >>> Dataflow:
> https://beam.apache.org/documentation/runners/capability-matrix/additional-common-features-not-yet-part-of-the-beam-model/
> >>>
> >>> Is there a recommended approach to handling checkpointing on Google
> >>> Cloud Dataflow when using streaming sources like Kinesis and Kafka, so
> >>> that a pipeline could be resumed from where it left off if it needs to
> >>> be stopped or crashes for some reason?
> >>>
> >>> Thanks!
> >>> Will Baker
>


Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-29 Thread Reuven Lax via user
Google Cloud Dataflow does support snapshots
. Is this
what you were looking for?

On Mon, Aug 29, 2022 at 4:04 PM Kenneth Knowles  wrote:

> Hi Will, David,
>
> I think you'll find the best source of answer for this sort of question on
> the user@beam list. I've put that in the To: line with a BCC: to the
> dev@beam list so everyone knows they can find the thread there. If I have
> misunderstood, and your question has to do with building Beam itself, feel
> free to move it back.
>
> Kenn
>
> On Mon, Aug 29, 2022 at 2:24 PM Will Baker  wrote:
>
>> Hello!
>>
>> I am wondering about using checkpoints with Beam running on Google
>> Cloud Dataflow.
>>
>> The docs indicate that checkpoints are not supported by Google Cloud
>> Dataflow:
>> https://beam.apache.org/documentation/runners/capability-matrix/additional-common-features-not-yet-part-of-the-beam-model/
>>
>> Is there a recommended approach to handling checkpointing on Google
>> Cloud Dataflow when using streaming sources like Kinesis and Kafka, so
>> that a pipeline could be resumed from where it left off if it needs to
>> be stopped or crashes for some reason?
>>
>> Thanks!
>> Will Baker
>>
>


Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
That would be a nice feature, though maybe some work to implement.

On Thu, Aug 4, 2022 at 2:49 PM Brian Hulette  wrote:

> In some places (e.g. in AutoValueSchema) we assume that nested
> schema-inferred types are of the same "class". I filed [1] to track this a
> while back - I think we should support mixing and matching SchemaProviders
> for nested types.
>
> [1] https://github.com/apache/beam/issues/20359
>
> On Thu, Aug 4, 2022 at 2:45 PM Reuven Lax via user 
> wrote:
>
>> We do have JavaBeanSchema which might work, depending on whether your
>> thrift class conforms to java beans.
>>
>> On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van 
>> wrote:
>>
>>> Hi,
>>>
>>> I have an AutoValue class and it looks like this
>>>
>>> @AutoValue
>>> @DefaultSchema( AutoValueSchema.class )
>>> public abstract class MyClass {
>>> public abstract String getField1();
>>> public abstract MyThriftClass getField2();
>>> public static Builder Builder() {
>>> return new AutoValue_MyClass.Builder();
>>> }
>>>
>>> @AutoValue.Builder
>>> public static abstract class Builder() {
>>> public abstract Builder setField1(String field1);
>>> public abstract Builder setField2(MyThriftClass field2);
>>> public abstract MyClass build();
>>> }
>>> }
>>>
>>> MyThriftClass is not an AutoValue class and it inherits from
>>> org.apache.thrift.TBase class.
>>>
>>> When I run a pipeline with a PCollection of elements that are instances
>>> of this class, I got this error java.lang.IllegalStateException:
>>> AutoValue generated class not found: com.foo.bar.AutoValue_MyThriftClass
>>> .
>>>
>>> My question is, is it possible to use a non-AutoValue member in an
>>> AutoValue class like what I am doing now? If yes then how can I do it? If
>>> no then what would be the alternatives?
>>>
>>> Thank you
>>>
>>> -Binh
>>>
>>>
>>>


Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
We do have JavaBeanSchema which might work, depending on whether your
thrift class conforms to java beans.

On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van  wrote:

> Hi,
>
> I have an AutoValue class and it looks like this
>
> @AutoValue
> @DefaultSchema( AutoValueSchema.class )
> public abstract class MyClass {
> public abstract String getField1();
> public abstract MyThriftClass getField2();
> public static Builder Builder() {
> return new AutoValue_MyClass.Builder();
> }
>
> @AutoValue.Builder
> public static abstract class Builder() {
> public abstract Builder setField1(String field1);
> public abstract Builder setField2(MyThriftClass field2);
> public abstract MyClass build();
> }
> }
>
> MyThriftClass is not an AutoValue class and it inherits from
> org.apache.thrift.TBase class.
>
> When I run a pipeline with a PCollection of elements that are instances of
> this class, I got this error java.lang.IllegalStateException: AutoValue
> generated class not found: com.foo.bar.AutoValue_MyThriftClass.
>
> My question is, is it possible to use a non-AutoValue member in an
> AutoValue class like what I am doing now? If yes then how can I do it? If
> no then what would be the alternatives?
>
> Thank you
>
> -Binh
>
>
>


Re: GroupIntoBatches not working on Flink?

2022-07-26 Thread Reuven Lax via user
This might be a bug in the Flink runner, because it is implemented here

.

On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantinescu 
wrote:

> Hi everyone,
>
> Quick question about GroupIntoBatches.
>
> When running on Flink, eventually it hits an unsupported
> exception "Canceling a timer by ID is not yet supported." on this line [1].
> The source inputs are AVRO files for testing (batch) but will use kafka
> topics (streaming) when deployed.
>
> This happens when the batch is filled (10 items) and the max buffering
> time timer needs to be cancelled.
>
> Anyone else observed this issue?
>
> On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses
> InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I
> would guess there's a difference between batch and streaming requirements?
>
> Thank you,
> Cristian
>
>
> [1]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157
> [2]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136
> [3]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314
>


Re: Generating Hearbeats Using Looping Timer

2022-07-09 Thread Reuven Lax via user
On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra 
wrote:

> Maybe the previous post was too verbose so I will try to summarize my
> question -
> If one instance of DoFn tries to set a timer for a time which is behind
> the pipeline's watermark, can this cause the pipeline to stall for other
> keys as well?
> "stall" meaning here - other keys' timers will start lagging behind.
>

It depends on the runner, but in general timers should be independent.
However practically every worker has only so many threads to process and
timers are processed in order, so if a large number of these "old"
timers are set and they take a long time to process, this could cause some
delays.


> say there are 1 million DoFns running in a steady state(behaving as
> expected), where timers are firing at 5 min boundaries.
>

Do you mean 1 million keys? What do you mean by 1 million DoFns?


> 1 bad key comes which sets its timer to a time which is 1 hour older than
> the current watermark. What happens here? my understanding here is this -
>  the looping timer will fire back to back in quick succession for this bad
> key 12 times and after that this key also joins the group of 1 million keys
> which were firing regularly at 5 min boundaries.
>

Where does the number 12 come from?


> PS - Above DoFn is using default Global Windows and default trigger.
>
>
> On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>
>> Hello,
>> I have a pipeline which is generating heartbeats using looping timers in
>> a stateful dofn. Following is pseudo code for the process element and
>> onTimer methods
>>
>> StateSpec> lastSeenMsg = StateSpecs.value(...);
>> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>
>> processElemnt(input) {
>> // read event time from the message
>> Instant currentEventTime = input.getEventTimeEpoc();
>> if(input.state == ONLINE) {
>>lastSeenMsg.write(input);
>>// calculate start of looping timer
>>// which will be next 5 min boundary
>>long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000;
>>long offset = currentEventTimeEpocSeconds % 300;
>>long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
>>loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
>> }
>> else {
>>  // stop hearbeats when entity offline
>>   loopingTimer.clear();
>>}
>> }
>>
>>
>> onTimer() {
>> // emit out the lastSeenMsg
>> output(lastSeenMsg.read());
>>
>>
>> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
>> }
>>
>>
>> The above pipeline works well in low load scenarios. But on one of my
>> heavy traffic deployment the pipeline seems to be not able to keep up with
>> the load. Input msg from pubsub are state change events for an entity -
>>  Entity Online or Entity Offline messages. Once a entity comes Online we
>> start generating heartbeat every 5 min as long as we do not encounter
>> Offline message for that entity. Number of online entities can be fairly
>> large, more than 10 Million entities can be Online at a given time.
>>
>> I am seeing this particular DoFn starts lagging behind as soon as it gets
>> started. The timers are firing pretty late. The lag went up to 48 hours
>> before I restarted the pipeline. Is there something wrong in what I am
>> doing.
>> Note - I am reading the eventTime embedded in the msg. Intent for this is
>> fire a bunch of timers in quick succession if needed and fill up the DB
>> with heartbeats till current time.
>> So say a msg comes with state = Online and time = 10.02 AM. and current
>> watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
>> expect to fire immediately since the watermark is already ahead of this
>> time? (Or this is wrong understanding). Similarly the subsequent call to
>> onTimer method will set next timer to fire at 10:10 and that I also expect
>> to fire immediately. After this point this DoFn should start emitting at
>> same time with all other instances of this DoFn. Is there a mistake in this
>> implementaion?
>> Another thing I am noticing is that this pipeline is running a single
>> dataflow worker and not scaling up automatically. For such a large key
>> space (10 million DoFns and their timers) i expected the pipeline to use a
>> lot of CPU near the 5 minute boudaries and scale up but that is also not
>> happening.
>>
>