Re: Beam Website Feedback: Apache Beam WordCount Examples

2022-10-25 Thread Ahmet Altay via dev
Thank you for reporting William.

Adding @Valentyn Tymofieiev . Perhaps we could start
with creating an issue?

On Mon, Oct 24, 2022 at 9:25 AM William Pietri 
wrote:

> Hi! I have inherited code using Apache Beam and I'm trying to figure out
> the right way to use it.
>
> This seemed like it would be a good page for me:
> https://beam.apache.org/get-started/wordcount-example/
>
> I jumped in with the MinimalWordCount example
> ,
> which says it's explaining this python code
> .
> However, the explanation doesn't match the code.
>
> The code from the text goes like this:
>
>
> pipeline = beam.Pipeline(options=beam_options)
>
> pipeline| beam.io.ReadFromText(input_file)
>
> | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>
> | beam.combiners.Count.PerElement()
>
> | beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
>
> | beam.io.WriteToText(output_path)
>
> Fair enough, I'd say. However, the code in the example file is quite
> different:
>
>   with beam.Pipeline(options=pipeline_options) as p:
> # Read the text file[pattern] into a PCollection.
> lines = p | ReadFromText(known_args.input)
>
> # Count the occurrences of each word.
> counts = (
> lines
> | 'Split' >> (
> beam.FlatMap(
> lambda x: re.findall(r'[A-Za-z\']+', 
> x)).with_output_types(str))
> | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
> | 'GroupAndSum' >> beam.CombinePerKey(sum))
>
> # Format the counts into a PCollection of strings.
> def format_result(word_count):
>   (word, count) = word_count
>   return '%s: %s' % (word, count)
>
> output = counts | 'Format' >> beam.Map(format_result)
>
> # Write the output using a "Write" transform that has side effects.
> # pylint: disable=expression-not-assigned
> output | WriteToText(known_args.output)
>
>
> I assume this is also probably good? But there are a number of differences
> here in both structure and content. This confusion is exactly what I don't
> need in intro documentation, so I'd love it if somebody made this
> consistent.
>
> Thanks,
>
> William
>


Re: [CFP] In-person Beam meetups

2022-10-25 Thread Aizhamal Nurmamat kyzy
Hi all,

In-person Beam meetup is taking place in the Bay Area on Nov 2. RSVP here:
https://www.meetup.com/aittg-sfsv/events/288939880/

We will share the details on NYC and Bangalore meetups soon!


On Tue, Oct 11, 2022 at 10:24 AM Aizhamal Nurmamat kyzy 
wrote:

> Hey Beam community,
>
> We are organizing a few in-person community meetups for Apache Beam and
> opening up calls for proposals. Please send me directly the topics / ideas
> you'd like to present at the following dates and locations:
>
> November 2, Santa Clara, CA
> November 10, NYC
> November 12, Bangalore, India
>
> The typical agenda will look as follows:
> 5:30pm~6pm: check in and food
> 6:00pm~8pm: 2~3 tech talks
> 8pm~8:30pm: mingle and close
>
> Thanks!
>


On wrestling with beam/issues/21104

2022-10-25 Thread Pablo Estrada via dev
Hi all,
I've spent a few days trying to narrow down whatever is going on with
https://github.com/apache/beam/issues/21104.

The bug reproduces frequently on Jenkins and very rarely on my laptop
(sometimes ~20 min, sometimes hours, sometimes I give up after it doesn't
reproduce all night).

Seeing as how it reproduces so frequently in our Jenkins env (see:
https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/), I would like
to do the following:

1. Add debug information to log as much context as possible when it occurs
2. Create a bug blocking the next release to revert the change
3. Revert before the next release

And hopefully this experiment will let us gather more information about the
bug.

Is everyone in favor of this? Is anyone against? Comments?
Thanks!
-P.


Re: Questions on primitive transforms hierarchy

2022-10-25 Thread Jan Lukavský
> Not quite IMO. It is a subtle difference. Perhaps these transforms 
can be *implemented* using stateful DoFn, but defining their semantics 
directly at a high level is more powerful. The higher level we can make 
transforms, the more flexibility we have in the runners. You *could* 
suggest that we take the same approach as we do with Combine: not a 
primitive, but a special transform that we optimize. You could say that 
"vanilla ParDo" is a composite that has a stateful ParDo implementation, 
but a runner can implement the composite more efficiently (without a 
shuffle). Same with CoGBK. You could say that there is a default 
expansion of CoGBK that uses stateful DoFn (which implies a shuffle) but 
that smart runners will not use that expansion.


Yes, semantics > optimizations. For optimizations Beam already has a 
facility - PTransformOverride. There is no fundamental difference about 
how we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart 
runners will not use that expansion". This is essentially the root of 
this discussion.


If I rephrase it:

 a) why do we distinguish between "some" actually composite transforms 
treating them as primitive, while others have expansions, although the 
fundamental reasoning seems the same for both (performance)?


 b) is there a fundamental reason why we do not support stateful DoFn 
for merging windows?


I feel that these are related and have historical reasons, but I'd like 
to know that for sure. :)


 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a primitive, though
in practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles 
wrote:



On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding of the set
of Beam's primitive transforms, which I'd like to fill.
First a quick recap of what I think is the current state.
We have (basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that runners
can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both
GBK and stateful DoFn.


ReduceFnRunner is for windowing / triggers and has special
feature to use a CombineFn while doing it. Nothing to do with
stateful DoFn.


My bad, wrong wording. The point was that *all* of the semantics
of GBK and Combine can be defined in terms of stateful DoFn. There
are some changes needed to stateful DoFn to support the Combine
functionality. But as mentioned above - optimization is orthogonal
to semantics.


Not quite IMO. It is a subtle difference. Perhaps these transforms can 
be *implemented* using stateful DoFn, but defining their semantics 
directly at a high level is more powerful. The higher level we can 
make transforms, the more flexibility we have in the runners. You 
*could* suggest that we take the same approach as we do with Combine: 
not a primitive, but a special transform that we optimize. You could 
say that "vanilla ParDo" is a composite that has a stateful ParDo 
implementation, but a runner can implement the composite more 
efficiently (without a shuffle). Same with CoGBK. You could say that 
there is a default expansion of CoGBK that uses stateful DoFn (which 
implies a shuffle) but that smart runners will not use that expansion.



I'll compare this to the set of transforms we used to use
in Euphoria (currently java SDK extension):

 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window


Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really
just a secondary key for GBK/Combine that allows completion
of unbounded aggregations but has no computation associated
with it.


Merging WindowFn contains some computation. The fact that stateful
DoFn do not require specific form of reduce function is precisely
what makes it the actual primitive, no?



 - (missing Impulse)


Then you must have some primitive sources with splitting?

 - (missing splittable DoFn)


Kind of the same question - SDF is the one and only primitive
that creates parallelism.


Or

Beam High Priority Issue Report (42)

2022-10-25 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P0 Issues:

https://github.com/apache/beam/issues/23794 [Bug]: Storage Write API client 
hanging forever on shutdown


Unassigned P1 Issues:

https://github.com/apache/beam/issues/23815 [Bug]: Neo4j tests failing
https://github.com/apache/beam/issues/23813 [Bug]: PostRelease_NightlySnapshot 
failing
https://github.com/apache/beam/issues/23745 [Bug]: Samza 
AsyncDoFnRunnerTest.testSimplePipeline is flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/23693 [Bug]: apache_beam.io.kinesis 
module READ_DATA_URN mismatch
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22321 
PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing 
on jenkins
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21561 
ExternalPythonTransformTest.trivialPythonTransform flaky
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21123 Multiple jobs running on Flink 
session cluster reuse the persistent Python environment.
https://github.com/apache/beam/issues/21113 
testTwoTimersSettingEachOtherWithCreateAsInputBounded flaky
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20975 
org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
false] is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19734 
WatchTest.testMultiplePollsWithManyResults flake: Outputs must be in timestamp 
order (sickbayed)
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/23627 [Bug]: Website precommit flaky
https://github.com/apache/beam/issues/23489 [Bug]: add DebeziumIO to the 
connectors page
https://github.com/apache/beam/issues/23306 [Bug]: BigQueryBatchFileLoads in 
python loses data when using WRITE_TRUNCATE
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22891 [Bug]: 
beam_PostCommit_XVR_PythonUsingJavaDataflow is flaky
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/22011 [Bug]: 
org.apache.beam.sdk.io.aws2.kinesis.KinesisIOWriteTest.testWriteFailure flaky
https://github.com/apache/beam/issues/21893 [Bug]: BigQuery Storage Write API 
implementation does not support table partitioning
https://github.com/apache/beam/issues/21711 Python Streaming job failing to 
drain with BigQueryIO write errors
https://github.com/apache/beam/issues/21709 
beam_PostCommit_Java_ValidatesRunner_Samza Failing
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21707 GroupByKeyTest BasicTests 
testLargeKeys100MB flake (on ULR)
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21695 Da