Re: Consuming one PCollection before consuming another with Beam

2023-03-01 Thread Reuven Lax via dev
I'm not sure I understand this use case well. What are you planning on
doing with the BQ dataset if it were processed first? Were you planning on
caching information in memory? Storing data in Beam state? Something else?

On Wed, Mar 1, 2023 at 10:43 AM Kenneth Knowles  wrote:

>
>
> On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak 
> wrote:
>
>> The number of keys/data in BQ would not be constant and grow with time.
>>
>> A rough estimate would be around 300k keys with an average size of 5kb
>> per key. Both the count of the keys and the size of the key would be
>> feature dependent (based on the upstream pipelines) and we won't have
>> control over this in the future.
>>
>> Using big query client would mean we would have to run individual queries
>> for each of these 300k keys from the BusinessLogic() dofn which operates in
>> a global window KV
>>
>> Also, the order of the data from BQ would not matter to us since the only
>> thing we are trying to solve here is regaining the state spec information
>> before starting to consume pub/sub.
>>
>
> I was referring to order in general, across your whole data set as an
> abstract concept. If order _really_ doesn't matter, then you wouldn't need
> to read the BQ data first. You could just flatten them together and run the
> pipeline like that. So I think there is some order-dependence that you want
> to represent at the data level.
>
> Kenn
>
>
>> I will explore using Wait.on(bigquery) before pub/sub read since I am not
>> sure if side input would be the best option here.
>>
>>
>> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>>
>>> I'm also curious how much you depend on order to get the state contents
>>> right. The ordering of the side input will be arbitrary, and even the
>>> streaming input can have plenty of out of order messages. So I want to
>>> think about what are the data dependencies that result in the requirement
>>> of order. Or if there are none and you just want to know that all the past
>>> data has been processed, Niel's idea is one solution. It isn't parallel,
>>> though.
>>>
>>> Kenn
>>>
>>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>>
 How large is this state spec stored in BQ? If the size isn't too large,
 you can read it from BQ and make it a side input into the DoFn.

 On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
 smo...@paloaltonetworks.com> wrote:

> We are trying to re-initialize our state specs in the BusinessLogic()
> DoFn from BQ.
> BQ has data about the state spec, and we would like to make sure that
> the state specs in our BusinessLogic() dofn are initialized before it
> starts consuming the pub/sub.
>
> This is for handling the case of redeployment of the dataflow jobs so
> that the states are preserved and the BusinessLogic() can work seamlessly
> as it was previously. All our dofns are operating in a global window and 
> do
> not perform any aggregation.
>
> We are currently using Redis to preserve the state spec information
> but would like to explore using BQ as an alternative to Redis.
>
> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
> wrote:
>
>> My suggestion is to try to solve the problem in terms of what you
>> want to compute. Instead of trying to control the operational aspects 
>> like
>> "read all the BQ before reading Pubsub" there is presumably some reason
>> that the BQ data naturally "comes first", for example if its timestamps 
>> are
>> earlier or if there is a join or an aggregation that must include it.
>> Whenever you think you want to set up an operational dependency between 
>> two
>> things that "happen" in a pipeline, it is often best to pivot your 
>> thinking
>> to the data and what you are trying to compute, and the built-in
>> dependencies will solve the ordering problems.
>>
>> So - is there a way to describe your problem in terms of the data and
>> what you are trying to compute?
>>
>> Kenn
>>
>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
>> dev@beam.apache.org> wrote:
>>
>>> First PCollections are completely unordered, so there is no
>>> guarantee on what order you'll see events in the flattened PCollection.
>>>
>>> There may be ways to process the BigQuery data in a
>>> separate transform first, but it depends on the structure of the data. 
>>> How
>>> large is the BigQuery table? Are you doing any windowed aggregations 
>>> here?
>>>
>>> Reuven
>>>
>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 Yes, this is a streaming pipeline.

 Some more details about existing implementation v/s what we want to
 achieve.

 Current implementation:
 Reading from pub-sub:

 Pipeline input = Pipeline.create(options);

Re: Consuming one PCollection before consuming another with Beam

2023-03-01 Thread Kenneth Knowles
On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak 
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>

I was referring to order in general, across your whole data set as an
abstract concept. If order _really_ doesn't matter, then you wouldn't need
to read the BQ data first. You could just flatten them together and run the
pipeline like that. So I think there is some order-dependence that you want
to represent at the data level.

Kenn


> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 We are trying to re-initialize our state specs in the BusinessLogic()
 DoFn from BQ.
 BQ has data about the state spec, and we would like to make sure that
 the state specs in our BusinessLogic() dofn are initialized before it
 starts consuming the pub/sub.

 This is for handling the case of redeployment of the dataflow jobs so
 that the states are preserved and the BusinessLogic() can work seamlessly
 as it was previously. All our dofns are operating in a global window and do
 not perform any aggregation.

 We are currently using Redis to preserve the state spec information but
 would like to explore using BQ as an alternative to Redis.

 On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
 wrote:

> My suggestion is to try to solve the problem in terms of what you want
> to compute. Instead of trying to control the operational aspects like 
> "read
> all the BQ before reading Pubsub" there is presumably some reason that the
> BQ data naturally "comes first", for example if its timestamps are earlier
> or if there is a join or an aggregation that must include it. Whenever you
> think you want to set up an operational dependency between two things that
> "happen" in a pipeline, it is often best to pivot your thinking to the 
> data
> and what you are trying to compute, and the built-in dependencies will
> solve the ordering problems.
>
> So - is there a way to describe your problem in terms of the data and
> what you are trying to compute?
>
> Kenn
>
> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
> dev@beam.apache.org> wrote:
>
>> First PCollections are completely unordered, so there is no guarantee
>> on what order you'll see events in the flattened PCollection.
>>
>> There may be ways to process the BigQuery data in a
>> separate transform first, but it depends on the structure of the data. 
>> How
>> large is the BigQuery table? Are you doing any windowed aggregations 
>> here?
>>
>> Reuven
>>
>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>> smo...@paloaltonetworks.com> wrote:
>>
>>> Yes, this is a streaming pipeline.
>>>
>>> Some more details about existing implementation v/s what we want to
>>> achieve.
>>>
>>> Current implementation:
>>> Reading from pub-sub:
>>>
>>> Pipeline input = Pipeline.create(options);
>>>
>>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>
>>> .fromSubscription(inputSubscriptionId))
>>>
>>>
>>> Reading from bigquery:
>>>
>>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>>> 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Niel Markwick via dev
 Regarding ordering; anything that requires inputs to be in a specific
order in Beam will be problematic due the nature of parallel processing -
you will always get race conditions.

Assuming you are still intending to Flatten the bigQuery and PubSub
PCollections, using Wait(on) before flattening the 2 PCollections will not
make much difference, as there is still a strong likelihood that the BQ and
Pubsub records will be interleaved in the BigQuery output.

If the BQ read is to update internal state, then I assume that you need to
store that state somewhere in your BusinessLogic DoFn() - if this storage
is in RAM, then all worker instances of your  BusinessLogic DoFn() will
need to have access to all records of that BQ read - the only way to do
this is through a side input - if you are sending this data it in the
normal input and you have multiple workers, each worker will only get some,
not all, of the BQ records so each worker's internal state would be
inconsistent.

 > Using big query client would mean we would have to run individual
queries for each of these 300k keys from the BusinessLogic() dofn which
operates in a global window KV

Or read all the BigQuery records at once on BusinessLogic() startup and
store them in the internal state ... which ends up being the same as using
a side input.


-- 

* •  **Niel Markwick*
* •  *Cloud Solutions Architect 
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks



On Wed, 1 Mar 2023 at 02:15, Sahil Modak via dev 
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>
> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 We are trying to re-initialize our state specs in the BusinessLogic()
 DoFn from BQ.
 BQ has data about the state spec, and we would like to make sure that
 the state specs in our BusinessLogic() dofn are initialized before it
 starts consuming the pub/sub.

 This is for handling the case of redeployment of the dataflow jobs so
 that the states are preserved and the BusinessLogic() can work seamlessly
 as it was previously. All our dofns are operating in a global window and do
 not perform any aggregation.

 We are currently using Redis to preserve the state spec information but
 would like to explore using BQ as an alternative to Redis.

 On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
 wrote:

> My suggestion is to try to solve the problem in terms of what you want
> to compute. Instead of trying to control the operational aspects like 
> "read
> all the BQ before reading Pubsub" there is presumably some reason that the
> BQ data naturally "comes first", for example if its timestamps are earlier
> or if there is a join or an aggregation that must include it. Whenever you
> think you want to set up an operational dependency between two things that
> "happen" in a pipeline, it is often best to pivot your thinking to the 
> data
> and what you are trying to 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Sahil Modak via dev
The number of keys/data in BQ would not be constant and grow with time.

A rough estimate would be around 300k keys with an average size of 5kb per
key. Both the count of the keys and the size of the key would be feature
dependent (based on the upstream pipelines) and we won't have control over
this in the future.

Using big query client would mean we would have to run individual queries
for each of these 300k keys from the BusinessLogic() dofn which operates in
a global window KV

Also, the order of the data from BQ would not matter to us since the only
thing we are trying to solve here is regaining the state spec information
before starting to consume pub/sub.

I will explore using Wait.on(bigquery) before pub/sub read since I am not
sure if side input would be the best option here.


On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:

> I'm also curious how much you depend on order to get the state contents
> right. The ordering of the side input will be arbitrary, and even the
> streaming input can have plenty of out of order messages. So I want to
> think about what are the data dependencies that result in the requirement
> of order. Or if there are none and you just want to know that all the past
> data has been processed, Niel's idea is one solution. It isn't parallel,
> though.
>
> Kenn
>
> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>
>> How large is this state spec stored in BQ? If the size isn't too large,
>> you can read it from BQ and make it a side input into the DoFn.
>>
>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
>> wrote:
>>
>>> We are trying to re-initialize our state specs in the BusinessLogic()
>>> DoFn from BQ.
>>> BQ has data about the state spec, and we would like to make sure that
>>> the state specs in our BusinessLogic() dofn are initialized before it
>>> starts consuming the pub/sub.
>>>
>>> This is for handling the case of redeployment of the dataflow jobs so
>>> that the states are preserved and the BusinessLogic() can work seamlessly
>>> as it was previously. All our dofns are operating in a global window and do
>>> not perform any aggregation.
>>>
>>> We are currently using Redis to preserve the state spec information but
>>> would like to explore using BQ as an alternative to Redis.
>>>
>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
>>> wrote:
>>>
 My suggestion is to try to solve the problem in terms of what you want
 to compute. Instead of trying to control the operational aspects like "read
 all the BQ before reading Pubsub" there is presumably some reason that the
 BQ data naturally "comes first", for example if its timestamps are earlier
 or if there is a join or an aggregation that must include it. Whenever you
 think you want to set up an operational dependency between two things that
 "happen" in a pipeline, it is often best to pivot your thinking to the data
 and what you are trying to compute, and the built-in dependencies will
 solve the ordering problems.

 So - is there a way to describe your problem in terms of the data and
 what you are trying to compute?

 Kenn

 On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
 dev@beam.apache.org> wrote:

> First PCollections are completely unordered, so there is no guarantee
> on what order you'll see events in the flattened PCollection.
>
> There may be ways to process the BigQuery data in a separate transform
> first, but it depends on the structure of the data. How large is the
> BigQuery table? Are you doing any windowed aggregations here?
>
> Reuven
>
> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
> smo...@paloaltonetworks.com> wrote:
>
>> Yes, this is a streaming pipeline.
>>
>> Some more details about existing implementation v/s what we want to
>> achieve.
>>
>> Current implementation:
>> Reading from pub-sub:
>>
>> Pipeline input = Pipeline.create(options);
>>
>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>
>> .fromSubscription(inputSubscriptionId))
>>
>>
>> Reading from bigquery:
>>
>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>
>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>
>>
>> Merge the inputs:
>>
>> PCollection mergedInput = 
>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>>
>> Business Logic:
>>
>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>
>>
>>
>> Above logic is what we use currently in our pipeline.
>>
>> We want to make sure that we read from BigQuery first & pass the 
>> bqStream through 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Kenneth Knowles
I'm also curious how much you depend on order to get the state contents
right. The ordering of the side input will be arbitrary, and even the
streaming input can have plenty of out of order messages. So I want to
think about what are the data dependencies that result in the requirement
of order. Or if there are none and you just want to know that all the past
data has been processed, Niel's idea is one solution. It isn't parallel,
though.

Kenn

On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:

> How large is this state spec stored in BQ? If the size isn't too large,
> you can read it from BQ and make it a side input into the DoFn.
>
> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
> wrote:
>
>> We are trying to re-initialize our state specs in the BusinessLogic()
>> DoFn from BQ.
>> BQ has data about the state spec, and we would like to make sure that the
>> state specs in our BusinessLogic() dofn are initialized before it starts
>> consuming the pub/sub.
>>
>> This is for handling the case of redeployment of the dataflow jobs so
>> that the states are preserved and the BusinessLogic() can work seamlessly
>> as it was previously. All our dofns are operating in a global window and do
>> not perform any aggregation.
>>
>> We are currently using Redis to preserve the state spec information but
>> would like to explore using BQ as an alternative to Redis.
>>
>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>>
>>> My suggestion is to try to solve the problem in terms of what you want
>>> to compute. Instead of trying to control the operational aspects like "read
>>> all the BQ before reading Pubsub" there is presumably some reason that the
>>> BQ data naturally "comes first", for example if its timestamps are earlier
>>> or if there is a join or an aggregation that must include it. Whenever you
>>> think you want to set up an operational dependency between two things that
>>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>>> and what you are trying to compute, and the built-in dependencies will
>>> solve the ordering problems.
>>>
>>> So - is there a way to describe your problem in terms of the data and
>>> what you are trying to compute?
>>>
>>> Kenn
>>>
>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>>> wrote:
>>>
 First PCollections are completely unordered, so there is no guarantee
 on what order you'll see events in the flattened PCollection.

 There may be ways to process the BigQuery data in a separate transform
 first, but it depends on the structure of the data. How large is the
 BigQuery table? Are you doing any windowed aggregations here?

 Reuven

 On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
 smo...@paloaltonetworks.com> wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming
>> pipeline? If so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we
>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>> Can
>>> you please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Jan Lukavský
In the case that the data is too large for side input, you could do the 
same by reassigning timestamps of the BQ input to 
BoundedWindow.TIMESTAMP_MIN_VALUE (you would have to do that in a 
stateful DoFn with a timer having outputTimestamp set to 
TIMESTAMP_MIN_VALUE to hold watermark, or using splittable DoFn, or if 
BQ allows you to specify timestamp function use that directly).


In your BusinessLogic() you would set timer that would wait for 
watermark move (e.g. timer.offset(1).setRelative()) and buffer 
everything until the timer fires. Because the input from BQ is bounded, 
it will eventually advance to TIMESTAMP_MAX_VALUE which will fire the 
timer and flush the buffer.


I think this pattern might be useful on its own, so if you decided to 
implement it, it might be good to incorporate it into the core 
transforms (we already have Wait.on() which is somewhat similar). I can 
imagine a mini-workflow, that would take a bounded and unbounded 
PCollection, a DoFn and a function to be applied on the DoFn first for 
elements of the bounded PCollection and only after that start processing 
the unbounded one.


 Jan

On 2/27/23 20:59, Reuven Lax via dev wrote:
How large is this state spec stored in BQ? If the size isn't too 
large, you can read it from BQ and make it a side input into the DoFn.


On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
 wrote:


We are trying to re-initialize our state specs in the
BusinessLogic() DoFn from BQ.
BQ has data about the state spec, and we would like to make sure
that the state specs in our BusinessLogic() dofn are initialized
before it starts consuming the pub/sub.

This is for handling the case of redeployment of the dataflow jobs
so that the states are preserved and the BusinessLogic() can work
seamlessly as it was previously. All our dofns are operating in a
global window and do not perform any aggregation.

We are currently using Redis to preserve the state spec
information but would like to explore using BQ as an alternative
to Redis.

On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
wrote:

My suggestion is to try to solve the problem in terms of what
you want to compute. Instead of trying to control the
operational aspects like "read all the BQ before reading
Pubsub" there is presumably some reason that the BQ data
naturally "comes first", for example if its timestamps are
earlier or if there is a join or an aggregation that must
include it. Whenever you think you want to set up an
operational dependency between two things that "happen" in a
pipeline, it is often best to pivot your thinking to the data
and what you are trying to compute, and the built-in
dependencies will solve the ordering problems.

So - is there a way to describe your problem in terms of the
data and what you are trying to compute?

Kenn

On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev
 wrote:

First PCollections are completely unordered, so there is
no guarantee on what order you'll see events in the
flattened PCollection.

There may be ways to process the BigQuery data in a
separate transform first, but it depends on the structure
of the data. How large is the BigQuery table? Are you
doing any windowed aggregations here?

Reuven

On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak
 wrote:

Yes, this is a streaming pipeline.

Some more details about existing implementation v/s
what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input =Pipeline.create(options);

PCollection pubsubStream = input.apply("Read From 
Pubsub",PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))

Reading from bigquery:

PCollection bqStream = input.apply("Read from 
BQ",BigQueryIO .readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));

Merge the inputs:

PCollection mergedInput 
=PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
Flatten.pCollections());

Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new
BusinessLogic()))

Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first
& pass the bqStream through our BusinessLogic() before
we start consuming pubsubStream.

Is there a way to achieve this?

Thanks,

  

Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Reuven Lax via dev
How large is this state spec stored in BQ? If the size isn't too large, you
can read it from BQ and make it a side input into the DoFn.

On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
wrote:

> We are trying to re-initialize our state specs in the BusinessLogic() DoFn
> from BQ.
> BQ has data about the state spec, and we would like to make sure that the
> state specs in our BusinessLogic() dofn are initialized before it starts
> consuming the pub/sub.
>
> This is for handling the case of redeployment of the dataflow jobs so that
> the states are preserved and the BusinessLogic() can work seamlessly as it
> was previously. All our dofns are operating in a global window and do not
> perform any aggregation.
>
> We are currently using Redis to preserve the state spec information but
> would like to explore using BQ as an alternative to Redis.
>
> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>
>> My suggestion is to try to solve the problem in terms of what you want to
>> compute. Instead of trying to control the operational aspects like "read
>> all the BQ before reading Pubsub" there is presumably some reason that the
>> BQ data naturally "comes first", for example if its timestamps are earlier
>> or if there is a join or an aggregation that must include it. Whenever you
>> think you want to set up an operational dependency between two things that
>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>> and what you are trying to compute, and the built-in dependencies will
>> solve the ordering problems.
>>
>> So - is there a way to describe your problem in terms of the data and
>> what you are trying to compute?
>>
>> Kenn
>>
>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>> wrote:
>>
>>> First PCollections are completely unordered, so there is no guarantee on
>>> what order you'll see events in the flattened PCollection.
>>>
>>> There may be ways to process the BigQuery data in a separate transform
>>> first, but it depends on the structure of the data. How large is the
>>> BigQuery table? Are you doing any windowed aggregations here?
>>>
>>> Reuven
>>>
>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 Yes, this is a streaming pipeline.

 Some more details about existing implementation v/s what we want to
 achieve.

 Current implementation:
 Reading from pub-sub:

 Pipeline input = Pipeline.create(options);

 PCollection pubsubStream = input.apply("Read From Pubsub", 
 PubsubIO.readMessagesWithAttributesAndMessageId()

 .fromSubscription(inputSubscriptionId))


 Reading from bigquery:

 PCollection bqStream = input.apply("Read from BQ", BigQueryIO
 .readTableRows().fromQuery(bqQuery).usingStandardSql())

 .apply("JSon Transform", AsJsons.of(TableRow.class));


 Merge the inputs:

 PCollection mergedInput = 
 PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
 Flatten.pCollections());



 Business Logic:

 mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))



 Above logic is what we use currently in our pipeline.

 We want to make sure that we read from BigQuery first & pass the bqStream 
 through our BusinessLogic() before we start consuming pubsubStream.

 Is there a way to achieve this?


 Thanks,

 Sahil


 On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:

> Can you explain this use case some more? Is this a streaming pipeline?
> If so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
> dev@beam.apache.org> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we
>> start consuming the data from pub-sub. Is there a way to achieve this? 
>> Can
>> you please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs = 
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>> Input", Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Niel Markwick via dev
Why not pass the BQ data as.a side input to your transform?

Side inputs are read fully and materialised before the transform starts.

This will allow your transform to initialize its state before processing
any elements from the PubSub input.

On Mon, 27 Feb 2023, 20:43 Daniel Collins via dev, 
wrote:

> It sounds like what you're doing here might be best done outside the beam
> model. Instead of performing the initial computation reading from BQ into a
> PCollection, perform it using the BigQuery client library in the same
> manner as you currently do to load the data from redis.
>
> On Mon, Feb 27, 2023 at 2:07 PM Sahil Modak via dev 
> wrote:
>
>> We are trying to re-initialize our state specs in the BusinessLogic()
>> DoFn from BQ.
>> BQ has data about the state spec, and we would like to make sure that the
>> state specs in our BusinessLogic() dofn are initialized before it starts
>> consuming the pub/sub.
>>
>> This is for handling the case of redeployment of the dataflow jobs so
>> that the states are preserved and the BusinessLogic() can work seamlessly
>> as it was previously. All our dofns are operating in a global window and do
>> not perform any aggregation.
>>
>> We are currently using Redis to preserve the state spec information but
>> would like to explore using BQ as an alternative to Redis.
>>
>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>>
>>> My suggestion is to try to solve the problem in terms of what you want
>>> to compute. Instead of trying to control the operational aspects like "read
>>> all the BQ before reading Pubsub" there is presumably some reason that the
>>> BQ data naturally "comes first", for example if its timestamps are earlier
>>> or if there is a join or an aggregation that must include it. Whenever you
>>> think you want to set up an operational dependency between two things that
>>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>>> and what you are trying to compute, and the built-in dependencies will
>>> solve the ordering problems.
>>>
>>> So - is there a way to describe your problem in terms of the data and
>>> what you are trying to compute?
>>>
>>> Kenn
>>>
>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>>> wrote:
>>>
 First PCollections are completely unordered, so there is no guarantee
 on what order you'll see events in the flattened PCollection.

 There may be ways to process the BigQuery data in a separate transform
 first, but it depends on the structure of the data. How large is the
 BigQuery table? Are you doing any windowed aggregations here?

 Reuven

 On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
 smo...@paloaltonetworks.com> wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming
>> pipeline? If so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we
>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>> Can
>>> you please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten transform in this manner.
>>>
>>> 

Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Daniel Collins via dev
It sounds like what you're doing here might be best done outside the beam
model. Instead of performing the initial computation reading from BQ into a
PCollection, perform it using the BigQuery client library in the same
manner as you currently do to load the data from redis.

On Mon, Feb 27, 2023 at 2:07 PM Sahil Modak via dev 
wrote:

> We are trying to re-initialize our state specs in the BusinessLogic() DoFn
> from BQ.
> BQ has data about the state spec, and we would like to make sure that the
> state specs in our BusinessLogic() dofn are initialized before it starts
> consuming the pub/sub.
>
> This is for handling the case of redeployment of the dataflow jobs so that
> the states are preserved and the BusinessLogic() can work seamlessly as it
> was previously. All our dofns are operating in a global window and do not
> perform any aggregation.
>
> We are currently using Redis to preserve the state spec information but
> would like to explore using BQ as an alternative to Redis.
>
> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>
>> My suggestion is to try to solve the problem in terms of what you want to
>> compute. Instead of trying to control the operational aspects like "read
>> all the BQ before reading Pubsub" there is presumably some reason that the
>> BQ data naturally "comes first", for example if its timestamps are earlier
>> or if there is a join or an aggregation that must include it. Whenever you
>> think you want to set up an operational dependency between two things that
>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>> and what you are trying to compute, and the built-in dependencies will
>> solve the ordering problems.
>>
>> So - is there a way to describe your problem in terms of the data and
>> what you are trying to compute?
>>
>> Kenn
>>
>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>> wrote:
>>
>>> First PCollections are completely unordered, so there is no guarantee on
>>> what order you'll see events in the flattened PCollection.
>>>
>>> There may be ways to process the BigQuery data in a separate transform
>>> first, but it depends on the structure of the data. How large is the
>>> BigQuery table? Are you doing any windowed aggregations here?
>>>
>>> Reuven
>>>
>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 Yes, this is a streaming pipeline.

 Some more details about existing implementation v/s what we want to
 achieve.

 Current implementation:
 Reading from pub-sub:

 Pipeline input = Pipeline.create(options);

 PCollection pubsubStream = input.apply("Read From Pubsub", 
 PubsubIO.readMessagesWithAttributesAndMessageId()

 .fromSubscription(inputSubscriptionId))


 Reading from bigquery:

 PCollection bqStream = input.apply("Read from BQ", BigQueryIO
 .readTableRows().fromQuery(bqQuery).usingStandardSql())

 .apply("JSon Transform", AsJsons.of(TableRow.class));


 Merge the inputs:

 PCollection mergedInput = 
 PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
 Flatten.pCollections());



 Business Logic:

 mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))



 Above logic is what we use currently in our pipeline.

 We want to make sure that we read from BigQuery first & pass the bqStream 
 through our BusinessLogic() before we start consuming pubsubStream.

 Is there a way to achieve this?


 Thanks,

 Sahil


 On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:

> Can you explain this use case some more? Is this a streaming pipeline?
> If so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
> dev@beam.apache.org> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we
>> start consuming the data from pub-sub. Is there a way to achieve this? 
>> Can
>> you please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs = 
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>> Input", Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Sahil Modak via dev
We are trying to re-initialize our state specs in the BusinessLogic() DoFn
from BQ.
BQ has data about the state spec, and we would like to make sure that the
state specs in our BusinessLogic() dofn are initialized before it starts
consuming the pub/sub.

This is for handling the case of redeployment of the dataflow jobs so that
the states are preserved and the BusinessLogic() can work seamlessly as it
was previously. All our dofns are operating in a global window and do not
perform any aggregation.

We are currently using Redis to preserve the state spec information but
would like to explore using BQ as an alternative to Redis.

On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:

> My suggestion is to try to solve the problem in terms of what you want to
> compute. Instead of trying to control the operational aspects like "read
> all the BQ before reading Pubsub" there is presumably some reason that the
> BQ data naturally "comes first", for example if its timestamps are earlier
> or if there is a join or an aggregation that must include it. Whenever you
> think you want to set up an operational dependency between two things that
> "happen" in a pipeline, it is often best to pivot your thinking to the data
> and what you are trying to compute, and the built-in dependencies will
> solve the ordering problems.
>
> So - is there a way to describe your problem in terms of the data and what
> you are trying to compute?
>
> Kenn
>
> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
> wrote:
>
>> First PCollections are completely unordered, so there is no guarantee on
>> what order you'll see events in the flattened PCollection.
>>
>> There may be ways to process the BigQuery data in a separate transform
>> first, but it depends on the structure of the data. How large is the
>> BigQuery table? Are you doing any windowed aggregations here?
>>
>> Reuven
>>
>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
>> wrote:
>>
>>> Yes, this is a streaming pipeline.
>>>
>>> Some more details about existing implementation v/s what we want to
>>> achieve.
>>>
>>> Current implementation:
>>> Reading from pub-sub:
>>>
>>> Pipeline input = Pipeline.create(options);
>>>
>>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>
>>> .fromSubscription(inputSubscriptionId))
>>>
>>>
>>> Reading from bigquery:
>>>
>>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>
>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>
>>>
>>> Merge the inputs:
>>>
>>> PCollection mergedInput = 
>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>> Flatten.pCollections());
>>>
>>>
>>>
>>> Business Logic:
>>>
>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>
>>>
>>>
>>> Above logic is what we use currently in our pipeline.
>>>
>>> We want to make sure that we read from BigQuery first & pass the bqStream 
>>> through our BusinessLogic() before we start consuming pubsubStream.
>>>
>>> Is there a way to achieve this?
>>>
>>>
>>> Thanks,
>>>
>>> Sahil
>>>
>>>
>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>>>
 Can you explain this use case some more? Is this a streaming pipeline?
 If so, how are you reading from BigQuery?

 On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
 dev@beam.apache.org> wrote:

> Hi,
>
> We have a requirement wherein we are consuming input from pub/sub
> (PubSubIO) as well as BQ (BQIO)
>
> We want to make sure that we consume the BQ stream first before we
> start consuming the data from pub-sub. Is there a way to achieve this? Can
> you please help with some code samples?
>
> Currently, we read data from big query using BigQueryIO into a
> PCollection & also read data from pubsub using PubsubIO. We then use the
> flatten transform in this manner.
>
> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>
> kvPairs = 
> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
> Input", Flatten.pCollections());
>
>
> Thanks,
> Sahil
>
>


Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Kenneth Knowles
My suggestion is to try to solve the problem in terms of what you want to
compute. Instead of trying to control the operational aspects like "read
all the BQ before reading Pubsub" there is presumably some reason that the
BQ data naturally "comes first", for example if its timestamps are earlier
or if there is a join or an aggregation that must include it. Whenever you
think you want to set up an operational dependency between two things that
"happen" in a pipeline, it is often best to pivot your thinking to the data
and what you are trying to compute, and the built-in dependencies will
solve the ordering problems.

So - is there a way to describe your problem in terms of the data and what
you are trying to compute?

Kenn

On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
wrote:

> First PCollections are completely unordered, so there is no guarantee on
> what order you'll see events in the flattened PCollection.
>
> There may be ways to process the BigQuery data in a separate transform
> first, but it depends on the structure of the data. How large is the
> BigQuery table? Are you doing any windowed aggregations here?
>
> Reuven
>
> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
> wrote:
>
>> Yes, this is a streaming pipeline.
>>
>> Some more details about existing implementation v/s what we want to
>> achieve.
>>
>> Current implementation:
>> Reading from pub-sub:
>>
>> Pipeline input = Pipeline.create(options);
>>
>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>
>> .fromSubscription(inputSubscriptionId))
>>
>>
>> Reading from bigquery:
>>
>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>
>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>
>>
>> Merge the inputs:
>>
>> PCollection mergedInput = 
>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>>
>> Business Logic:
>>
>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>
>>
>>
>> Above logic is what we use currently in our pipeline.
>>
>> We want to make sure that we read from BigQuery first & pass the bqStream 
>> through our BusinessLogic() before we start consuming pubsubStream.
>>
>> Is there a way to achieve this?
>>
>>
>> Thanks,
>>
>> Sahil
>>
>>
>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>>
>>> Can you explain this use case some more? Is this a streaming pipeline?
>>> If so, how are you reading from BigQuery?
>>>
>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hi,

 We have a requirement wherein we are consuming input from pub/sub
 (PubSubIO) as well as BQ (BQIO)

 We want to make sure that we consume the BQ stream first before we
 start consuming the data from pub-sub. Is there a way to achieve this? Can
 you please help with some code samples?

 Currently, we read data from big query using BigQueryIO into a
 PCollection & also read data from pubsub using PubsubIO. We then use the
 flatten transform in this manner.

 PCollection pubsubKvPairs = reads from pubsub using PubsubIO
 PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO

 kvPairs = 
 PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
 Input", Flatten.pCollections());


 Thanks,
 Sahil




Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Reuven Lax via dev
First PCollections are completely unordered, so there is no guarantee on
what order you'll see events in the flattened PCollection.

There may be ways to process the BigQuery data in a separate transform
first, but it depends on the structure of the data. How large is the
BigQuery table? Are you doing any windowed aggregations here?

Reuven

On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming pipeline? If
>> so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
>> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we start
>>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>>> please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten transform in this manner.
>>>
>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>
>>> kvPairs = 
>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>>> Flatten.pCollections());
>>>
>>>
>>> Thanks,
>>> Sahil
>>>
>>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Sahil Modak via dev
Yes, this is a streaming pipeline.

Some more details about existing implementation v/s what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input = Pipeline.create(options);

PCollection pubsubStream = input.apply("Read From Pubsub",
PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))


Reading from bigquery:

PCollection bqStream = input.apply("Read from BQ", BigQueryIO
.readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));


Merge the inputs:

PCollection mergedInput =
PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input",
Flatten.pCollections());



Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))



Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first & pass the
bqStream through our BusinessLogic() before we start consuming
pubsubStream.

Is there a way to achieve this?


Thanks,

Sahil


On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:

> Can you explain this use case some more? Is this a streaming pipeline? If
> so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we start
>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>> please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs = 
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-23 Thread Reuven Lax via dev
Can you explain this use case some more? Is this a streaming pipeline? If
so, how are you reading from BigQuery?

On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
wrote:

> Hi,
>
> We have a requirement wherein we are consuming input from pub/sub
> (PubSubIO) as well as BQ (BQIO)
>
> We want to make sure that we consume the BQ stream first before we start
> consuming the data from pub-sub. Is there a way to achieve this? Can you
> please help with some code samples?
>
> Currently, we read data from big query using BigQueryIO into a PCollection
> & also read data from pubsub using PubsubIO. We then use the flatten
> transform in this manner.
>
> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>
> kvPairs = PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
> Input", Flatten.pCollections());
>
>
> Thanks,
> Sahil
>
>


Consuming one PCollection before consuming another with Beam

2023-02-23 Thread Sahil Modak via dev
Hi,

We have a requirement wherein we are consuming input from pub/sub
(PubSubIO) as well as BQ (BQIO)

We want to make sure that we consume the BQ stream first before we start
consuming the data from pub-sub. Is there a way to achieve this? Can you
please help with some code samples?

Currently, we read data from big query using BigQueryIO into a PCollection
& also read data from pubsub using PubsubIO. We then use the flatten
transform in this manner.

PCollection pubsubKvPairs = reads from pubsub using PubsubIO
PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO

kvPairs = PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge
Input", Flatten.pCollections());


Thanks,
Sahil