Re: Using unbounded source as a side input for a DoFn

2022-07-20 Thread Cristian Constantinescu
Disclaimer: I am not an expert, but I kinda worked on something similar.

A few points I'd like to bring up:
- Side inputs do not trigger the processElement function when new elements
are added to the input. That means that if your side input doesn't have the
desired other item in the side input at the time it's processed, too bad.
Unless you use timers to reprocess it at a later time when the side input
might have more data.

- If your goal to somewhat combine two PCollections, I would suggest you
look into CoGroupByKey[1] and it's schema aware brother CoGroup [2], you
can then use a global window that triggers on every element
(AfterProcessingTime.pastFirstElementInPane).

- The input of a PTransform can also be a PCollectionTuple and in your
inner ParDo, you can loop through items of either collection. Something
like this. Pseudocode:

class FooTransfrom extends PTransform>{
private TupleTag aTag = new TupleTag() {} <- curly brackets
are important as far as I know
private TupleTag bTag = new TupleTag() {}
// getters for the above fields

public  PCollection  expand(PCollectionTuple input){
return input.apply(new FooDoFn(aTag, bTag));
}

private static class FooDoFn extends DoFn{
constructor(TupleTag aTag, TupleTag bTag){
// set fields
}
public void processElement(Context ctx){
var itemFromA = ctx.element().get(this.aTag);
if(itemFromA != null) { logic }

var itemsFromB = ctx.element().get(this.bTag);
if(itemFromB != null) { logic } // adding these to a state variable would
effectively be an unbounded side input
}
}
}

Hope it helps,
Cristian

[1]
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/
[2]
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html

On Thu, Jul 21, 2022 at 1:45 AM Sahil Modak 
wrote:

> Hi,
>
> We are looking to use the side input feature for one of our DoFns. The
> side input has to be a PCollection which is being constructed from a
> subscription using PubsubIO.read
>
> We want our primary DoFn which operates in a global window KV pair to
> access this side input.
> The goal is to have all the messages of this unbounded source (side input)
> to be available across all the KV pairs in our input DoFn which will use
> this side input.
>
> Is it possible to have an unbounded source (like pubsub) as a side input?
>
> Thanks,
> Sahil
>


Re: Using unbounded source as a side input for a DoFn

2022-07-20 Thread Reuven Lax via dev
How do you want to use the side input?

On Wed, Jul 20, 2022 at 10:45 PM Sahil Modak 
wrote:

> Hi,
>
> We are looking to use the side input feature for one of our DoFns. The
> side input has to be a PCollection which is being constructed from a
> subscription using PubsubIO.read
>
> We want our primary DoFn which operates in a global window KV pair to
> access this side input.
> The goal is to have all the messages of this unbounded source (side input)
> to be available across all the KV pairs in our input DoFn which will use
> this side input.
>
> Is it possible to have an unbounded source (like pubsub) as a side input?
>
> Thanks,
> Sahil
>


Using unbounded source as a side input for a DoFn

2022-07-20 Thread Sahil Modak
Hi,

We are looking to use the side input feature for one of our DoFns. The side
input has to be a PCollection which is being constructed from a
subscription using PubsubIO.read

We want our primary DoFn which operates in a global window KV pair to
access this side input.
The goal is to have all the messages of this unbounded source (side input)
to be available across all the KV pairs in our input DoFn which will use
this side input.

Is it possible to have an unbounded source (like pubsub) as a side input?

Thanks,
Sahil