Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Niel Markwick via dev
 Regarding ordering; anything that requires inputs to be in a specific
order in Beam will be problematic due the nature of parallel processing -
you will always get race conditions.

Assuming you are still intending to Flatten the bigQuery and PubSub
PCollections, using Wait(on) before flattening the 2 PCollections will not
make much difference, as there is still a strong likelihood that the BQ and
Pubsub records will be interleaved in the BigQuery output.

If the BQ read is to update internal state, then I assume that you need to
store that state somewhere in your BusinessLogic DoFn() - if this storage
is in RAM, then all worker instances of your  BusinessLogic DoFn() will
need to have access to all records of that BQ read - the only way to do
this is through a side input - if you are sending this data it in the
normal input and you have multiple workers, each worker will only get some,
not all, of the BQ records so each worker's internal state would be
inconsistent.

 > Using big query client would mean we would have to run individual
queries for each of these 300k keys from the BusinessLogic() dofn which
operates in a global window KV

Or read all the BigQuery records at once on BusinessLogic() startup and
store them in the internal state ... which ends up being the same as using
a side input.


-- 

* •  **Niel Markwick*
* •  *Cloud Solutions Architect 
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks



On Wed, 1 Mar 2023 at 02:15, Sahil Modak via dev 
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>
> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 We are trying to re-initialize our state specs in the BusinessLogic()
 DoFn from BQ.
 BQ has data about the state spec, and we would like to make sure that
 the state specs in our BusinessLogic() dofn are initialized before it
 starts consuming the pub/sub.

 This is for handling the case of redeployment of the dataflow jobs so
 that the states are preserved and the BusinessLogic() can work seamlessly
 as it was previously. All our dofns are operating in a global window and do
 not perform any aggregation.

 We are currently using Redis to preserve the state spec information but
 would like to explore using BQ as an alternative to Redis.

 On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
 wrote:

> My suggestion is to try to solve the problem in terms of what you want
> to compute. Instead of trying to control the operational aspects like 
> "read
> all the BQ before reading Pubsub" there is presumably some reason that the
> BQ data naturally "comes first", for example if its timestamps are earlier
> or if there is a join or an aggregation that must include it. Whenever you
> think you want to set up an operational dependency between two things that
> "happen" in a pipeline, it is often best to pivot your thinking to the 
> data
> and what you are trying to 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Sahil Modak via dev
The number of keys/data in BQ would not be constant and grow with time.

A rough estimate would be around 300k keys with an average size of 5kb per
key. Both the count of the keys and the size of the key would be feature
dependent (based on the upstream pipelines) and we won't have control over
this in the future.

Using big query client would mean we would have to run individual queries
for each of these 300k keys from the BusinessLogic() dofn which operates in
a global window KV

Also, the order of the data from BQ would not matter to us since the only
thing we are trying to solve here is regaining the state spec information
before starting to consume pub/sub.

I will explore using Wait.on(bigquery) before pub/sub read since I am not
sure if side input would be the best option here.


On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:

> I'm also curious how much you depend on order to get the state contents
> right. The ordering of the side input will be arbitrary, and even the
> streaming input can have plenty of out of order messages. So I want to
> think about what are the data dependencies that result in the requirement
> of order. Or if there are none and you just want to know that all the past
> data has been processed, Niel's idea is one solution. It isn't parallel,
> though.
>
> Kenn
>
> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>
>> How large is this state spec stored in BQ? If the size isn't too large,
>> you can read it from BQ and make it a side input into the DoFn.
>>
>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
>> wrote:
>>
>>> We are trying to re-initialize our state specs in the BusinessLogic()
>>> DoFn from BQ.
>>> BQ has data about the state spec, and we would like to make sure that
>>> the state specs in our BusinessLogic() dofn are initialized before it
>>> starts consuming the pub/sub.
>>>
>>> This is for handling the case of redeployment of the dataflow jobs so
>>> that the states are preserved and the BusinessLogic() can work seamlessly
>>> as it was previously. All our dofns are operating in a global window and do
>>> not perform any aggregation.
>>>
>>> We are currently using Redis to preserve the state spec information but
>>> would like to explore using BQ as an alternative to Redis.
>>>
>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
>>> wrote:
>>>
 My suggestion is to try to solve the problem in terms of what you want
 to compute. Instead of trying to control the operational aspects like "read
 all the BQ before reading Pubsub" there is presumably some reason that the
 BQ data naturally "comes first", for example if its timestamps are earlier
 or if there is a join or an aggregation that must include it. Whenever you
 think you want to set up an operational dependency between two things that
 "happen" in a pipeline, it is often best to pivot your thinking to the data
 and what you are trying to compute, and the built-in dependencies will
 solve the ordering problems.

 So - is there a way to describe your problem in terms of the data and
 what you are trying to compute?

 Kenn

 On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
 dev@beam.apache.org> wrote:

> First PCollections are completely unordered, so there is no guarantee
> on what order you'll see events in the flattened PCollection.
>
> There may be ways to process the BigQuery data in a separate transform
> first, but it depends on the structure of the data. How large is the
> BigQuery table? Are you doing any windowed aggregations here?
>
> Reuven
>
> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
> smo...@paloaltonetworks.com> wrote:
>
>> Yes, this is a streaming pipeline.
>>
>> Some more details about existing implementation v/s what we want to
>> achieve.
>>
>> Current implementation:
>> Reading from pub-sub:
>>
>> Pipeline input = Pipeline.create(options);
>>
>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>
>> .fromSubscription(inputSubscriptionId))
>>
>>
>> Reading from bigquery:
>>
>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>
>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>
>>
>> Merge the inputs:
>>
>> PCollection mergedInput = 
>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>>
>> Business Logic:
>>
>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>
>>
>>
>> Above logic is what we use currently in our pipeline.
>>
>> We want to make sure that we read from BigQuery first & pass the 
>> bqStream through 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Kenneth Knowles
I'm also curious how much you depend on order to get the state contents
right. The ordering of the side input will be arbitrary, and even the
streaming input can have plenty of out of order messages. So I want to
think about what are the data dependencies that result in the requirement
of order. Or if there are none and you just want to know that all the past
data has been processed, Niel's idea is one solution. It isn't parallel,
though.

Kenn

On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:

> How large is this state spec stored in BQ? If the size isn't too large,
> you can read it from BQ and make it a side input into the DoFn.
>
> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
> wrote:
>
>> We are trying to re-initialize our state specs in the BusinessLogic()
>> DoFn from BQ.
>> BQ has data about the state spec, and we would like to make sure that the
>> state specs in our BusinessLogic() dofn are initialized before it starts
>> consuming the pub/sub.
>>
>> This is for handling the case of redeployment of the dataflow jobs so
>> that the states are preserved and the BusinessLogic() can work seamlessly
>> as it was previously. All our dofns are operating in a global window and do
>> not perform any aggregation.
>>
>> We are currently using Redis to preserve the state spec information but
>> would like to explore using BQ as an alternative to Redis.
>>
>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>>
>>> My suggestion is to try to solve the problem in terms of what you want
>>> to compute. Instead of trying to control the operational aspects like "read
>>> all the BQ before reading Pubsub" there is presumably some reason that the
>>> BQ data naturally "comes first", for example if its timestamps are earlier
>>> or if there is a join or an aggregation that must include it. Whenever you
>>> think you want to set up an operational dependency between two things that
>>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>>> and what you are trying to compute, and the built-in dependencies will
>>> solve the ordering problems.
>>>
>>> So - is there a way to describe your problem in terms of the data and
>>> what you are trying to compute?
>>>
>>> Kenn
>>>
>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>>> wrote:
>>>
 First PCollections are completely unordered, so there is no guarantee
 on what order you'll see events in the flattened PCollection.

 There may be ways to process the BigQuery data in a separate transform
 first, but it depends on the structure of the data. How large is the
 BigQuery table? Are you doing any windowed aggregations here?

 Reuven

 On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
 smo...@paloaltonetworks.com> wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming
>> pipeline? If so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we
>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>> Can
>>> you please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten 

Re: Dependabot questions

2023-02-28 Thread Danny McCormick via dev
AFAIK Dependabot doesn't have a great replacement for this. I'm not sure
why the dependency reports stopped, but we could probably try to fix them -
looks like they stopped working in October -
https://lists.apache.org/list?dev@beam.apache.org:2021-10:dependency%20report.
We still have the job which generates the empty reports -
https://github.com/apache/beam/blob/fed35133ee1cb9eb0c5ec8a1b13a7c75835a1510/.test-infra/jenkins/job_Dependency_Check.groovy#L43

> Also, I noticed that some dependencies are outdated, yet not updated by
Dependabot. Possibly, because a prior update PR was silenced. Is it
possible to see the state of which dependencies are currently opted out?

There's not an awesome view of this - looking through logs at
https://github.com/apache/beam/network/updates/615364619 is the best I'm
aware of, though it was promised a year and a half ago -
https://github.com/dependabot/dependabot-core/issues/2255#issuecomment-838622025

On Mon, Feb 27, 2023 at 8:37 PM Valentyn Tymofieiev via dev <
dev@beam.apache.org> wrote:

> I noticed that human-readable dependency reports are not being generated.
> Can this functionality be replaced with Dependabot?
>
> Does Dependabot provide a view of what is currently outdated from its
> standpoint?
>
> Also, I noticed that some dependencies are outdated, yet not updated by
> Dependabot. Possibly, because a prior update PR was silenced. Is it
> possible to see the state of which dependencies are currently opted out?
>
>
> Thanks!
>
>
>


Beam High Priority Issue Report (36)

2023-02-28 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19465 Explore possibilities to lower 
in-use IP address quota footprint.
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/25412 [Feature Request]: Google Cloud 
Bigtable Change Stream Connector
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21645 
beam_PostCommit_XVR_GoUsingJava_Dataflow fails on some test transforms
https://github.com/apache/beam/issues/21476 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Jan Lukavský
In the case that the data is too large for side input, you could do the 
same by reassigning timestamps of the BQ input to 
BoundedWindow.TIMESTAMP_MIN_VALUE (you would have to do that in a 
stateful DoFn with a timer having outputTimestamp set to 
TIMESTAMP_MIN_VALUE to hold watermark, or using splittable DoFn, or if 
BQ allows you to specify timestamp function use that directly).


In your BusinessLogic() you would set timer that would wait for 
watermark move (e.g. timer.offset(1).setRelative()) and buffer 
everything until the timer fires. Because the input from BQ is bounded, 
it will eventually advance to TIMESTAMP_MAX_VALUE which will fire the 
timer and flush the buffer.


I think this pattern might be useful on its own, so if you decided to 
implement it, it might be good to incorporate it into the core 
transforms (we already have Wait.on() which is somewhat similar). I can 
imagine a mini-workflow, that would take a bounded and unbounded 
PCollection, a DoFn and a function to be applied on the DoFn first for 
elements of the bounded PCollection and only after that start processing 
the unbounded one.


 Jan

On 2/27/23 20:59, Reuven Lax via dev wrote:
How large is this state spec stored in BQ? If the size isn't too 
large, you can read it from BQ and make it a side input into the DoFn.


On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
 wrote:


We are trying to re-initialize our state specs in the
BusinessLogic() DoFn from BQ.
BQ has data about the state spec, and we would like to make sure
that the state specs in our BusinessLogic() dofn are initialized
before it starts consuming the pub/sub.

This is for handling the case of redeployment of the dataflow jobs
so that the states are preserved and the BusinessLogic() can work
seamlessly as it was previously. All our dofns are operating in a
global window and do not perform any aggregation.

We are currently using Redis to preserve the state spec
information but would like to explore using BQ as an alternative
to Redis.

On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
wrote:

My suggestion is to try to solve the problem in terms of what
you want to compute. Instead of trying to control the
operational aspects like "read all the BQ before reading
Pubsub" there is presumably some reason that the BQ data
naturally "comes first", for example if its timestamps are
earlier or if there is a join or an aggregation that must
include it. Whenever you think you want to set up an
operational dependency between two things that "happen" in a
pipeline, it is often best to pivot your thinking to the data
and what you are trying to compute, and the built-in
dependencies will solve the ordering problems.

So - is there a way to describe your problem in terms of the
data and what you are trying to compute?

Kenn

On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev
 wrote:

First PCollections are completely unordered, so there is
no guarantee on what order you'll see events in the
flattened PCollection.

There may be ways to process the BigQuery data in a
separate transform first, but it depends on the structure
of the data. How large is the BigQuery table? Are you
doing any windowed aggregations here?

Reuven

On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak
 wrote:

Yes, this is a streaming pipeline.

Some more details about existing implementation v/s
what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input =Pipeline.create(options);

PCollection pubsubStream = input.apply("Read From 
Pubsub",PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))

Reading from bigquery:

PCollection bqStream = input.apply("Read from 
BQ",BigQueryIO .readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));

Merge the inputs:

PCollection mergedInput 
=PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
Flatten.pCollections());

Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new
BusinessLogic()))

Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first
& pass the bqStream through our BusinessLogic() before
we start consuming pubsubStream.

Is there a way to achieve this?

Thanks,