Re: SDK Harness Memory Usage

2022-12-13 Thread Arwin Tio via dev
Hi Beam Team,

Bump on this. Does this question make sense?

Thanks,

Arwin

On Thu, Dec 8, 2022, 2:22 PM Arwin Tio  wrote:

> Hi Beam Team,
>
> Can somebody help me understand what are the factors behind SDK Harness
> memory usage? My first guess is that the SDK Harness memory usage depends
> on:
>
> 1. User code (i.e. DoFns)
> 2. Bundle size
>
> Basically, the maximum memory usage an SDK Harness needs is however much
> memory it takes for the user DoFn to process the largest bundle size. And
> the bundle size is determined by the Runner. So to limit SDK Harness memory
> usage, we have to ensure that our Runner selects small bundle sizes.
>
> However, looking through some design and the code, it seems like:
>
>- sdk_worker.py
>
> 
>  seems
>to be have multiple active bundle processors at the same time
>- The Fn API: How to send and receive data
>
> 
>  design
>doc seems to describe multiplexing multiple logical streams over a gRPC
>connection
>
> Does this mean that the SDK Harnesses process multiple bundles at the same
> time? If so, how are the number of concurrent bundles limited?
>
> Or in general, what suggestions do you have to reduce memory usage of SDK
> Harnesses?
>
> Thanks,
>
> Arwin
>

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.


Re: @RequiresStableInput and Pipeline fusion

2022-12-13 Thread Luke Cwik via dev
This is definitely not working for portable pipelines since the
GreedyPipelineFuser doesn't create a fusion boundary which as you pointed
out causes a single stage that has a non-deterministic function followed by
one that requires stable input. It seems as though we should have runners
check the requirements on the Pipeline[1] to ensure that they can
faithfully process such a pipeline and reject anything they don't support
early on.

Making the GreedyPipelineFuser insert that fusion break is likely the way
to go. Runners should be able to look at the ParDoPayload
requires_stable_input field for the ExecutableStage to see if any special
handling is necessary on their end before they pass data to that stage.

[1]:
https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111


On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský  wrote:

> Hi,
>
> I have a question about @RequiresStableInput functionality. I'm trying to
> make it work for portable Flink runner [1], [2]. We have an integration
> test (which should probably be turned into Validates runner test, but that
> is a different story) [3]. The test creates random key for input element,
> processes it once, fails the pipeline and then reprocesses it. This works
> well provided there is a checkpoint (shuffle in case of dataflow) exactly
> between assigning random key (via PairWithRandomKeyFn) and processing it
> with (via MakeSideEffectAndThenFailFn), this works well.
>
> The problem is that GreedyPipelineFuser fuses the transform 
> PairWithRandomKeyFn
> and MakeSideEffectAndThenFailFn into single ExecutableStage. This is then
> executed with the @RequiresStableInput requirement, but this obviously
> assigns a different key to the reprocessed element(s). This looks like we
> need to fix that in the PipelineFuser, is this right? Does this mean the
> @RequiresStableInput functionality is actually broken for all runners that
> use the default fusion?
>
> Another possibility is that we need to fix test by adding an explicit
> reshuffle (verified, this works), but I think that the test is actually
> correct, users would probably not expect transforms to be fused when
> crossing the @RequiresStableInput boundary.
>
> Thoughts?
>
>  Jan
>
>
> [1] https://github.com/apache/beam/issues/20812
> [2] https://github.com/apache/beam/pull/22889
> [3]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
>


Re: [Proposal] Adopt a Beam I/O Standard

2022-12-13 Thread Sachin Agarwal via dev
It would be helpful to explain the scope here - if the previous iteration
was too overweight, it would be good to be intentional.

I think all would agree that being more prescriptive would help IO makers
(especially those from startups looking to expand their reach).

On Mon, Dec 12, 2022 at 7:32 PM Chamikara Jayalath 
wrote:

> Yeah, I don't think either finalized or documented (in the Website) the
> previous iteration. This doc seems to contain details from the documents
> shared in the previous iteration.
>
> Thanks,
> Cham
>
>
>
> On Mon, Dec 12, 2022 at 6:49 PM Robert Burke  wrote:
>
>> I think ultimately: until the docs a clearly available on the Beam site
>> itself, it's not documentation. See also, design docs, previous emails, and
>> similar.
>>
>> On Mon, Dec 12, 2022, 6:07 PM Andrew Pilloud via dev 
>> wrote:
>>
>>> I believe the previous iteration was here:
>>> https://lists.apache.org/thread/3o8glwkn70kqjrf6wm4dyf8bt27s52hk
>>>
>>> The associated docs are:
>>> https://s.apache.org/beam-io-api-standard-documentation
>>> https://s.apache.org/beam-io-api-standard
>>>
>>> This is missing all the relational stuff that was in those docs, this
>>> appears to be another attempt starting from the beginning?
>>>
>>> Andrew
>>>
>>>
>>> On Mon, Dec 12, 2022 at 9:57 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 Thanks for writing this!

 IIRC, the similar design doc was sent for review here a while ago. Is
 this just an updated version and a new one?

 —
 Alexey

 On 11 Dec 2022, at 15:16, Herman Mak via dev 
 wrote:

 Hello Everyone,

 *TLDR*

 Should we adopt a set of standards that Connector I/Os should adhere
 to?
 Attached is a first version of a Beam I/O Standards guideline that
 includes opinionated best practices across important components of a
 Connector I/O, namely Documentation, Development and Testing.

 *The Long Version*

 Apache Beam is a unified open-source programming model for both batch
 and streaming. It runs on multiple platform runners and integrates with
 over 50 services using individually developed I/O Connectors
 .

 Given that Apache Beam connectors are written by many different
 developers and at varying points in time, they vary in syntax style,
 documentation completeness and testing done. For a new adopter of Apache
 Beam, that can definitely cause some uncertainty.

 So should we adopt a set of standards that Connector I/Os should adhere
 to?
 Attached is a first version, in Doc format, of a Beam I/O Standards
 guideline that includes opinionated best practices across important
 components of a Connector I/O, namely Documentation, Development and
 Testing. And the aim is to incorporate this into the documentation and to
 have it referenced as standards for new Connector I/Os (and ideally have
 existing Connectors upgraded over time). If it looks helpful, the immediate
 next step is that we can convert it into a .md as a PR into the Beam repo!

 Thanks and looking forward to feedbacks and discussion,

  [PUBLIC] Beam I/O Standards
 

 Herman Mak |  Customer Engineer, Hong Kong, Google Cloud |
 herman...@google.com |  +852-3923-5417 <+852%203923%205417>






Beam High Priority Issue Report (30)

2022-12-13 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24383 [Bug]: Daemon will be stopped at 
the end of the build after the daemon was no longer found in the daemon registry
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/23286 [Bug]: 
beam_PerformanceTests_InfluxDbIO_IT Flaky > 50 % Fail 
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21474 Flaky tests: Gradle build daemon 
disappeared unexpectedly
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19465 Explore possibilities to lower 
in-use IP address quota footprint.
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/24464 [Epic]: Implement 
FileWriteSchemaTransformProvider
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests




@RequiresStableInput and Pipeline fusion

2022-12-13 Thread Jan Lukavský

Hi,

I have a question about @RequiresStableInput functionality. I'm trying 
to make it work for portable Flink runner [1], [2]. We have an 
integration test (which should probably be turned into Validates runner 
test, but that is a different story) [3]. The test creates random key 
for input element, processes it once, fails the pipeline and then 
reprocesses it. This works well provided there is a checkpoint (shuffle 
in case of dataflow) exactly between assigning random key (via 
PairWithRandomKeyFn) and processing it with (via 
MakeSideEffectAndThenFailFn), this works well.


The problem is that GreedyPipelineFuser fuses the transform 
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single 
ExecutableStage. This is then executed with the @RequiresStableInput 
requirement, but this obviously assigns a different key to the 
reprocessed element(s). This looks like we need to fix that in the 
PipelineFuser, is this right? Does this mean the @RequiresStableInput 
functionality is actually broken for all runners that use the default 
fusion?


Another possibility is that we need to fix test by adding an explicit 
reshuffle (verified, this works), but I think that the test is actually 
correct, users would probably not expect transforms to be fused when 
crossing the @RequiresStableInput boundary.


Thoughts?

 Jan


[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java