Re: [DISCUSS] Provide MultimapUserStateHandler interface in StateRequestHandlers

2023-02-24 Thread Alan Zhang
Hi Robert,

Thanks for your confirmation that Fn API is already ready for supporting
the MultimapUserState use cases, really appreciate it! And totally agree
that how to integrate it depends on the runner's implementation.

A follow-up question:

   - is there any runner that already implemented these Multimap protocols?
  - I didn't find a runner(e.g. Dataflow Flink, Spark, and Samza)
  defined handlers(codes like [1]) for handling Multimap state
requests, so I
  think the answer is NO. But I wanted to double confirm with you.
  - Having this question just wanted to know: 1) if the existing Fn
  APIs for MultimapUserState are already used in production 2) if we can
  build/abstract some generic layers(like what Beam did in
  StateRequestHandlers now) to benefit other runners

Have a good weekend!

On Fri, Feb 24, 2023 at 1:18 PM Robert Burke  wrote:

> The runners should be able to support Multimap User State portably over
> the FnApi already.
>
>
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L937
>
> How that's supported on each SDK is a different matter though.
>
>
> On Fri, Feb 24, 2023, 12:57 PM Alan Zhang  wrote:
>
>> Appreciate it if anyone can help confirm and share thoughts.
>>
>> On Wed, Feb 22, 2023 at 11:46 PM Alan Zhang  wrote:
>>
>>> Hi Beam devs.
>>>
>>> According to the Fn State API design doc[1], the state type
>>> MultimapUserState is intended for supporting MapState/SetState. And the
>>> implementation[2] for this state type is ready on the SDK harness side.
>>> Each runner will be responsible for integrating it if they want to leverage
>>> it.
>>>
>>> Today Beam uses StateRequestHandlers to define handler interfaces for
>>> other state types, e.g. MultimapSideInputHandler for
>>> MultimapSideInput, BagUserStateHandler for BagUserState, etc.[3] This is
>>> great since each runner can implement these handler interfaces then the Fn
>>> state API integration is done.
>>>
>>> In order to support MapState/SetState, I think we will need to provide
>>> a MultimapUserStateHandler interface in StateRequestHandlers and allow the
>>> runners to implement it.
>>>
>>> What do you think?
>>>
>>> Feel free to correct me if there is any incorrect understanding since
>>> I'm new to the Beam world.
>>>
>>> Btw, I saw Flink Python used MultimapSideInput to support MapState[4]
>>> but I think this is not recommended since MultimapUserState is available
>>> today. But please correct me if I'm wrong.
>>>
>>>
>>> [1] https://s.apache.org/beam-fn-state-api-and-bundle-processing
>>> 
>>> [2] https://github.com/apache/beam/pull/15238
>>> [3]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java#L192
>>> [4]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>> --
>>> Thanks,
>>> Alan
>>>
>>
>>
>> --
>> Thanks,
>> Alan
>>
>

-- 
Thanks,
Alan


Re: [DISCUSS] Provide MultimapUserStateHandler interface in StateRequestHandlers

2023-02-24 Thread Robert Burke
The runners should be able to support Multimap User State portably over the
FnApi already.

https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L937

How that's supported on each SDK is a different matter though.


On Fri, Feb 24, 2023, 12:57 PM Alan Zhang  wrote:

> Appreciate it if anyone can help confirm and share thoughts.
>
> On Wed, Feb 22, 2023 at 11:46 PM Alan Zhang  wrote:
>
>> Hi Beam devs.
>>
>> According to the Fn State API design doc[1], the state type
>> MultimapUserState is intended for supporting MapState/SetState. And the
>> implementation[2] for this state type is ready on the SDK harness side.
>> Each runner will be responsible for integrating it if they want to leverage
>> it.
>>
>> Today Beam uses StateRequestHandlers to define handler interfaces for
>> other state types, e.g. MultimapSideInputHandler for
>> MultimapSideInput, BagUserStateHandler for BagUserState, etc.[3] This is
>> great since each runner can implement these handler interfaces then the Fn
>> state API integration is done.
>>
>> In order to support MapState/SetState, I think we will need to provide
>> a MultimapUserStateHandler interface in StateRequestHandlers and allow the
>> runners to implement it.
>>
>> What do you think?
>>
>> Feel free to correct me if there is any incorrect understanding since I'm
>> new to the Beam world.
>>
>> Btw, I saw Flink Python used MultimapSideInput to support MapState[4] but
>> I think this is not recommended since MultimapUserState is available today.
>> But please correct me if I'm wrong.
>>
>>
>> [1] https://s.apache.org/beam-fn-state-api-and-bundle-processing
>> 
>> [2] https://github.com/apache/beam/pull/15238
>> [3]
>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java#L192
>> [4]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>> --
>> Thanks,
>> Alan
>>
>
>
> --
> Thanks,
> Alan
>


Re: [DISCUSS] Provide MultimapUserStateHandler interface in StateRequestHandlers

2023-02-24 Thread Alan Zhang
Appreciate it if anyone can help confirm and share thoughts.

On Wed, Feb 22, 2023 at 11:46 PM Alan Zhang  wrote:

> Hi Beam devs.
>
> According to the Fn State API design doc[1], the state type
> MultimapUserState is intended for supporting MapState/SetState. And the
> implementation[2] for this state type is ready on the SDK harness side.
> Each runner will be responsible for integrating it if they want to leverage
> it.
>
> Today Beam uses StateRequestHandlers to define handler interfaces for
> other state types, e.g. MultimapSideInputHandler for
> MultimapSideInput, BagUserStateHandler for BagUserState, etc.[3] This is
> great since each runner can implement these handler interfaces then the Fn
> state API integration is done.
>
> In order to support MapState/SetState, I think we will need to provide
> a MultimapUserStateHandler interface in StateRequestHandlers and allow the
> runners to implement it.
>
> What do you think?
>
> Feel free to correct me if there is any incorrect understanding since I'm
> new to the Beam world.
>
> Btw, I saw Flink Python used MultimapSideInput to support MapState[4] but
> I think this is not recommended since MultimapUserState is available today.
> But please correct me if I'm wrong.
>
>
> [1] https://s.apache.org/beam-fn-state-api-and-bundle-processing
> 
> [2] https://github.com/apache/beam/pull/15238
> [3]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java#L192
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> --
> Thanks,
> Alan
>


-- 
Thanks,
Alan


Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Kenneth Knowles
My suggestion is to try to solve the problem in terms of what you want to
compute. Instead of trying to control the operational aspects like "read
all the BQ before reading Pubsub" there is presumably some reason that the
BQ data naturally "comes first", for example if its timestamps are earlier
or if there is a join or an aggregation that must include it. Whenever you
think you want to set up an operational dependency between two things that
"happen" in a pipeline, it is often best to pivot your thinking to the data
and what you are trying to compute, and the built-in dependencies will
solve the ordering problems.

So - is there a way to describe your problem in terms of the data and what
you are trying to compute?

Kenn

On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
wrote:

> First PCollections are completely unordered, so there is no guarantee on
> what order you'll see events in the flattened PCollection.
>
> There may be ways to process the BigQuery data in a separate transform
> first, but it depends on the structure of the data. How large is the
> BigQuery table? Are you doing any windowed aggregations here?
>
> Reuven
>
> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
> wrote:
>
>> Yes, this is a streaming pipeline.
>>
>> Some more details about existing implementation v/s what we want to
>> achieve.
>>
>> Current implementation:
>> Reading from pub-sub:
>>
>> Pipeline input = Pipeline.create(options);
>>
>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>
>> .fromSubscription(inputSubscriptionId))
>>
>>
>> Reading from bigquery:
>>
>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>
>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>
>>
>> Merge the inputs:
>>
>> PCollection mergedInput = 
>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>>
>> Business Logic:
>>
>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>
>>
>>
>> Above logic is what we use currently in our pipeline.
>>
>> We want to make sure that we read from BigQuery first & pass the bqStream 
>> through our BusinessLogic() before we start consuming pubsubStream.
>>
>> Is there a way to achieve this?
>>
>>
>> Thanks,
>>
>> Sahil
>>
>>
>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>>
>>> Can you explain this use case some more? Is this a streaming pipeline?
>>> If so, how are you reading from BigQuery?
>>>
>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hi,

 We have a requirement wherein we are consuming input from pub/sub
 (PubSubIO) as well as BQ (BQIO)

 We want to make sure that we consume the BQ stream first before we
 start consuming the data from pub-sub. Is there a way to achieve this? Can
 you please help with some code samples?

 Currently, we read data from big query using BigQueryIO into a
 PCollection & also read data from pubsub using PubsubIO. We then use the
 flatten transform in this manner.

 PCollection pubsubKvPairs = reads from pubsub using PubsubIO
 PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO

 kvPairs = 
 PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
 Input", Flatten.pCollections());


 Thanks,
 Sahil




Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Reuven Lax via dev
First PCollections are completely unordered, so there is no guarantee on
what order you'll see events in the flattened PCollection.

There may be ways to process the BigQuery data in a separate transform
first, but it depends on the structure of the data. How large is the
BigQuery table? Are you doing any windowed aggregations here?

Reuven

On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming pipeline? If
>> so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
>> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we start
>>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>>> please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten transform in this manner.
>>>
>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>
>>> kvPairs = 
>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>>> Flatten.pCollections());
>>>
>>>
>>> Thanks,
>>> Sahil
>>>
>>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Sahil Modak via dev
Yes, this is a streaming pipeline.

Some more details about existing implementation v/s what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input = Pipeline.create(options);

PCollection pubsubStream = input.apply("Read From Pubsub",
PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))


Reading from bigquery:

PCollection bqStream = input.apply("Read from BQ", BigQueryIO
.readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));


Merge the inputs:

PCollection mergedInput =
PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input",
Flatten.pCollections());



Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))



Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first & pass the
bqStream through our BusinessLogic() before we start consuming
pubsubStream.

Is there a way to achieve this?


Thanks,

Sahil


On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:

> Can you explain this use case some more? Is this a streaming pipeline? If
> so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we start
>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>> please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs = 
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>


Beam High Priority Issue Report (36)

2023-02-24 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19465 Explore possibilities to lower 
in-use IP address quota footprint.
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/25412 [Feature Request]: Google Cloud 
Bigtable Change Stream Connector
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21645 
beam_PostCommit_XVR_GoUsingJava_Dataflow fails on some test transforms
https://github.com/apache/beam/issues/21476