Re: Unbounded stream to FileIO.write

2020-05-14 Thread Luke Cwik
Many people use the local Flink runner as their production runner. The
direct runner doesn't have any "disaster" recovery options so you can only
kill the process and rerun the pipeline. Also, the direct runner doesn't
use any disk so as long as your working set for your pipeline fits in
memory it will work well. You'll want to turn off some features[1, 2] that
cause it to be slower since it performs extra validation to help with
testing for users.

1:
https://github.com/apache/beam/blob/4e47dea8283b5bf2d628bcd1642606816e78fd63/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L47
2:
https://github.com/apache/beam/blob/4e47dea8283b5bf2d628bcd1642606816e78fd63/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L55

On Thu, May 14, 2020 at 8:47 PM Nathan Fisher 
wrote:

> Awesome thanks Luke! I mirrored a subset of the data (20k records) to a
> local C* node and stripped out all of the windowing. It seems to be working
> fine in batch mode. I think it was the confluence of my inexperience with
> Beam and network latency that ultimately had me pulling out my hair.
>
> I started using the Direct Runner. Is there any guidance for what scale
> you'd advise against using it? To be honest the local flink runner was so
> easy I'm surprised it's not the default in the getting started
> documentation.
>
> On Thu, May 14, 2020 at 1:31 PM Luke Cwik  wrote:
>
>> You could still use streaming mode but you need to ensure that you use a
>> windowing strategy with a trigger that fires based upon processing time or
>> # records and not event time. This wouldn't require buffering all the data
>> as it is being read and would help with memory issues for runners that
>> don't have access to effectively infinite memory or offload to disk.
>>
>> On Wed, May 13, 2020 at 5:04 PM Nathan Fisher 
>> wrote:
>>
>>> Thanks Luke!
>>>
>>> When I started using the direct runner I was getting Out of Memory
>>> errors. I incorrectly thought toggling on streaming would help
>>> eliminate/minimise those errors. I started playing with windowing not
>>> realising a bounded IO would treat everything as the same window even with
>>> streaming on.
>>>
>>> I switched to the Flink runner after reading that the direct runner is
>>> actually intended to tease out pipeline issues. I’ve been using the Flink
>>> WebUI to monitor progress. Which has been helpful to visualise what
>>> progress was made in the stream.
>>>
>>> What I would ultimately like to achieve is reading from Cassandra and
>>> writing the records to one or more Parquet files based on either count (Eg
>>> 10,000 records per file) and/or size on disk (Eg rotate to a new file when
>>> 64MB is exceeded).
>>>
>>> The environment I’m testing against right now is a relatively small test
>>> env with 100k records or so. Larger envs will be 1-100m.
>>>
>>> Regards,
>>> Nathan
>>>
>>> On Wed, May 13, 2020 at 19:25, Luke Cwik  wrote:
>>>
 Since CassandraIO is not an unbounded source, the watermark will never
 advance until all the data is read from Cassandraw which means that you
 will buffer all the data if you (or any transform you use) has any event
 time based windowing strategy when grouping. You could swap to use a
 processing time windowing strategy (via Window.into) but it is unclear that
 is what you want.

 CassandraIO is not an unbounded source so why do you want --streaming?
 Also, why do you want to window.into fixed windows of 30 seconds?


 On Tue, May 12, 2020 at 8:28 AM Nathan Fisher 
 wrote:

> Hi Folks,
>
> Cross-posting from the Slack channel from the other day.
>
> I started looking at Beam again over the weekend. I have an unbounded
> stream with a CassandraIO input and am trying to write files using
> FileIO and ParquetIO.
>
> I'm using the following:
>
> Beam: 2.20.0
> Flink Runner/Cluster: 1.9(.3)
>
> java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
> --sdkWorkerParallelism=0 --runner=FlinkRunner
>
> When submitting to a Flink cluster I include
> --flinkMaster=localhost:8081 in the command.
>
> If I replace the FileIO with a simple log writer it prints out the
> records and makes progress. Using the FileIO with ParquetIO it stalls
> on the stage 
> write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
> ->
> write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
> -> write/WriteFiles/GatherTempFileResults/Add void
> key/AddKeys/Map/ParMultiDo(Anonymous).
>
> That brings me to ask the following questions:
>
>1. What's the best way to test and monitor a beam pipeline?
>2. What adjustments are required to get this pipeline writing
>files?
>3. Is there some kind of way to evaluate the DAG and identify
>scenarios 

Re: Unbounded stream to FileIO.write

2020-05-14 Thread Nathan Fisher
Awesome thanks Luke! I mirrored a subset of the data (20k records) to a
local C* node and stripped out all of the windowing. It seems to be working
fine in batch mode. I think it was the confluence of my inexperience with
Beam and network latency that ultimately had me pulling out my hair.

I started using the Direct Runner. Is there any guidance for what scale
you'd advise against using it? To be honest the local flink runner was so
easy I'm surprised it's not the default in the getting started
documentation.

On Thu, May 14, 2020 at 1:31 PM Luke Cwik  wrote:

> You could still use streaming mode but you need to ensure that you use a
> windowing strategy with a trigger that fires based upon processing time or
> # records and not event time. This wouldn't require buffering all the data
> as it is being read and would help with memory issues for runners that
> don't have access to effectively infinite memory or offload to disk.
>
> On Wed, May 13, 2020 at 5:04 PM Nathan Fisher 
> wrote:
>
>> Thanks Luke!
>>
>> When I started using the direct runner I was getting Out of Memory
>> errors. I incorrectly thought toggling on streaming would help
>> eliminate/minimise those errors. I started playing with windowing not
>> realising a bounded IO would treat everything as the same window even with
>> streaming on.
>>
>> I switched to the Flink runner after reading that the direct runner is
>> actually intended to tease out pipeline issues. I’ve been using the Flink
>> WebUI to monitor progress. Which has been helpful to visualise what
>> progress was made in the stream.
>>
>> What I would ultimately like to achieve is reading from Cassandra and
>> writing the records to one or more Parquet files based on either count (Eg
>> 10,000 records per file) and/or size on disk (Eg rotate to a new file when
>> 64MB is exceeded).
>>
>> The environment I’m testing against right now is a relatively small test
>> env with 100k records or so. Larger envs will be 1-100m.
>>
>> Regards,
>> Nathan
>>
>> On Wed, May 13, 2020 at 19:25, Luke Cwik  wrote:
>>
>>> Since CassandraIO is not an unbounded source, the watermark will never
>>> advance until all the data is read from Cassandraw which means that you
>>> will buffer all the data if you (or any transform you use) has any event
>>> time based windowing strategy when grouping. You could swap to use a
>>> processing time windowing strategy (via Window.into) but it is unclear that
>>> is what you want.
>>>
>>> CassandraIO is not an unbounded source so why do you want --streaming?
>>> Also, why do you want to window.into fixed windows of 30 seconds?
>>>
>>>
>>> On Tue, May 12, 2020 at 8:28 AM Nathan Fisher 
>>> wrote:
>>>
 Hi Folks,

 Cross-posting from the Slack channel from the other day.

 I started looking at Beam again over the weekend. I have an unbounded
 stream with a CassandraIO input and am trying to write files using
 FileIO and ParquetIO.

 I'm using the following:

 Beam: 2.20.0
 Flink Runner/Cluster: 1.9(.3)

 java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
 --sdkWorkerParallelism=0 --runner=FlinkRunner

 When submitting to a Flink cluster I include
 --flinkMaster=localhost:8081 in the command.

 If I replace the FileIO with a simple log writer it prints out the
 records and makes progress. Using the FileIO with ParquetIO it stalls
 on the stage 
 write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
 ->
 write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
 -> write/WriteFiles/GatherTempFileResults/Add void
 key/AddKeys/Map/ParMultiDo(Anonymous).

 That brings me to ask the following questions:

1. What's the best way to test and monitor a beam pipeline?
2. What adjustments are required to get this pipeline writing files?
3. Is there some kind of way to evaluate the DAG and identify
scenarios where this stall is likely?

PipelineOptions pipelineOptions = PipelineOptionsFactory
 .fromArgs(args)
 .withValidation()
 .create();
 Pipeline p = Pipeline.create(pipelineOptions);
 CoderRegistry registry = p.getCoderRegistry();
 registry.registerCoderForClass(GenericRecord.class, 
 AvroCoder.of(SCHEMA));PCollection metrics = 
 p.apply("cassandra",
 CassandraIO.read()
 .withHosts(hosts)
 .withPort(9042)
 .withLocalDc("datacenter1")
 .withKeyspace(KEY_SPACE)
 .withTable(TABLE)
 .withMinNumberOfSplits(100)
 .withEntity(Metric.class)
 .withCoder(SerializableCoder.of(Metric.class)));
 metrics.apply("window",
 Window.into(
 FixedWindows.of(Duration.standardSeconds(30)))
 

Re: Try Beam Katas Today

2020-05-14 Thread Henry Suryawirawan
The guide for sharing the course (publishing to Stepik) can be found here:
https://www.jetbrains.com/help/education/educator-start-guide.html#share_course
It is well integrated in the IDE with a few clicks.

Yeah the publication process is now distinct.
At the moment, when the PR for the course changes is approved, we update
the Stepik version, add & commit the auto-generated metadata YAML to the
PR, and then merge & close the PR.
Open to suggestion if there is a better way to publish.



On Thu, May 14, 2020 at 10:21 PM Austin Bennett 
wrote:

> It looks like there are instructions online for writing exercises/Katas:
> https://www.jetbrains.com/help/education/educator-start-guide.html
>
> Do we have a guide for contributing and publication/releases occur
> (publishing to Stepik)?  Although the code lives in the main repo
> (therefore subject to those contrib guidelines), I think the
> release/publication schedule is distinct?
>
> This hopefully will help illustrate that we are able to contribute to
> Katas (PRs welcome?), and not just consume them!
>
>
>
> On Thu, May 14, 2020 at 1:41 AM Henry Suryawirawan <
> hsuryawira...@google.com> wrote:
>
>> Yeah certainly we can expand it further.
>> There are more lessons that definitely can be added further.
>>
>> >Eg more the write side windowing interactions?
>> Are you referring to Write IOs?
>>
>>
>>
>> On Wed, May 13, 2020 at 11:56 PM Nathan Fisher 
>> wrote:
>>
>>> I went through them earlier this week! Definitely helpful.
>>>
>>> Is it possible to expand the katas available in the lO section? Eg more
>>> the write side windowing interactions?
>>>
>>> On Wed, May 13, 2020 at 11:36, Luke Cwik  wrote:
>>>
 These are an excellent learning tool.

 On Tue, May 12, 2020 at 11:02 PM Pablo Estrada 
 wrote:

> Sharing Damon's email with the user@ list as well. Thanks Damon!
>
> On Tue, May 12, 2020 at 9:02 PM Damon Douglas 
> wrote:
>
>> Hello Everyone,
>>
>> If you don't already know, there are helpful instructional tools for
>> learning the Apache Beam SDKs called Beam Katas hosted on
>> https://stepik.org.  Similar to traditional Kata
>> , they are meant to be repeated
>> as practice.  Before practicing the katas myself, I found myself
>> copy/pasting code (Please accept my confession  ).  Now I find myself
>> actually composing pipelines.  Just like kata forms, you find them 
>> becoming
>> part of you.  If you are interested, below are listed the current 
>> available
>> katas:
>>
>> 1.  Java - https://stepik.org/course/54530
>>
>> 2.  Python -  https://stepik.org/course/54532
>>
>> 3.  Go (in development) - https://stepik.org/course/70387
>>
>> If you are absolutely brand new to Beam and it scares you like it
>> scared me, come talk to me.
>>
>> Best,
>>
>> Damon
>>
> --
>>> Nathan Fisher
>>>  w: http://junctionbox.ca/
>>>
>>


Re: Try Beam Katas Today

2020-05-14 Thread Rion Williams
+1 on the contributions front. My team and I have been working with Beam 
primarily with Kotlin and I recently added the appropriate dependencies to 
Gradle and performed a bit of conversions and have it working as expected 
against the existing Java course.

I don’t know how many others are actively working with Kotlin and Beam, but I’d 
love to work on transitioning that into a proper course (assuming there’s 
interest in it).

> On May 14, 2020, at 10:32 AM, Nathan Fisher  wrote:
> 
> 
> Yes write IO
> 
>> On Thu, May 14, 2020 at 05:41, Henry Suryawirawan  
>> wrote:
>> Yeah certainly we can expand it further.
>> There are more lessons that definitely can be added further.
>> 
>> >Eg more the write side windowing interactions?
>> Are you referring to Write IOs?
>> 
>> 
>> 
>>> On Wed, May 13, 2020 at 11:56 PM Nathan Fisher  
>>> wrote:
>>> I went through them earlier this week! Definitely helpful.
>>> 
>>> Is it possible to expand the katas available in the lO section? Eg more the 
>>> write side windowing interactions?
>>> 
 On Wed, May 13, 2020 at 11:36, Luke Cwik  wrote:
 These are an excellent learning tool.
 
> On Tue, May 12, 2020 at 11:02 PM Pablo Estrada  wrote:
> Sharing Damon's email with the user@ list as well. Thanks Damon!
> 
>> On Tue, May 12, 2020 at 9:02 PM Damon Douglas  
>> wrote:
>> Hello Everyone,
>> 
>> If you don't already know, there are helpful instructional tools for 
>> learning the Apache Beam SDKs called Beam Katas hosted on 
>> https://stepik.org.  Similar to traditional Kata, they are meant to be 
>> repeated as practice.  Before practicing the katas myself, I found 
>> myself copy/pasting code (Please accept my confession  ).  Now I find 
>> myself actually composing pipelines.  Just like kata forms, you find 
>> them becoming part of you.  If you are interested, below are listed the 
>> current available katas:
>> 
>> 1.  Java - https://stepik.org/course/54530
>> 
>> 2.  Python -  https://stepik.org/course/54532
>> 
>> 3.  Go (in development) - https://stepik.org/course/70387
>> 
>> If you are absolutely brand new to Beam and it scares you like it scared 
>> me, come talk to me.
>> 
>> Best,
>> 
>> Damon
>>> -- 
>>> Nathan Fisher
>>>  w: http://junctionbox.ca/
> -- 
> Nathan Fisher
>  w: http://junctionbox.ca/


Re: beam python on spark-runner

2020-05-14 Thread Kyle Weaver
Keep in mind that those instructions about spark-submit are meant only to
apply to the Java-only runner. For Python, running spark-submit in this
manner is not going to work.

See https://issues.apache.org/jira/browse/BEAM-8970

On Thu, May 14, 2020 at 2:55 PM Heejong Lee  wrote:

> How did you start spark job server and what version of Apache Beam SDK did
> you use?
>
> There were some protocol changes recently so if both versions are not
> matched you may see gRPC errors. If you used the gradle command on the
> latest head for starting spark job server, I would recommend checking out
> the same version of the source with the SDK version you installed and
> trying again.
>
> On Wed, May 13, 2020 at 2:51 PM Naveen M  wrote:
>
>> Hi,
>>
>> I am trying to run sample WordCount beam job with PortableRunner by
>> following the documentation here,
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> I want to run this as a spark-submit command with YARN resource manager.
>>
>> Can you please let me know what is missing here? Thanks your help.
>>
>>
>> I tried the below commands and giving some weird errors,
>>
>>
>>
>>
>> spark-submit --master yarn --deploy-mode client --driver-memory 2g
>> --executor-memory 1g --executor-cores 1 WordCount.py --input ""
>> --output "" --runner PortableRunner --job_endpoint localhost:8099
>>
>>
>>
>>
>>
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py", line
>> 503, in __exit__
>>
>> self.run().wait_until_finish()
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py", line
>> 483, in run
>>
>> self._options).run(False)
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py", line
>> 496, in run
>>
>>return self.runner.run_pipeline(self, self._options)
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 384, in run_pipeline
>>
>> job_service_plan.submit(proto_pipeline)
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 105, in submit
>>
>> prepare_response.staging_session_token)
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 190, in stage
>>
>> staging_location='')
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/stager.py",
>> line 229, in stage_job_resources
>>
>> self.stage_artifact(pickled_session_file, staged_path)
>>
>>   File
>> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_stager.py",
>> line 98, in stage_artifact
>>
>> self._artifact_staging_stub.PutArtifact(artifact_request_generator())
>>
>>   File "/usr/local/lib64/python3.7/site-packages/grpc/_channel.py", line
>> 1011, in __call__
>>
>> return _end_unary_response_blocking(state, call, False, None)
>>
>>   File "/usr/local/lib64/python3.7/site-packages/grpc/_channel.py", line
>> 729, in _end_unary_response_blocking
>>
>> raise _InactiveRpcError(state)
>>
>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>> terminated with:
>>
>> status = StatusCode.UNIMPLEMENTED
>>
>> details = "Method not found:
>> org.apache.beam.model.job_management.v1.ArtifactStagingService/PutArtifact"
>>
>> debug_error_string =
>> "{"created":"@1589406258.175447016","description":"Error received from peer
>> ipv6:[::1]:8098","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Method
>> not found:
>> org.apache.beam.model.job_management.v1.ArtifactStagingService/PutArtifact","grpc_status":12}"
>>
>


Re: beam python on spark-runner

2020-05-14 Thread Heejong Lee
How did you start spark job server and what version of Apache Beam SDK did
you use?

There were some protocol changes recently so if both versions are not
matched you may see gRPC errors. If you used the gradle command on the
latest head for starting spark job server, I would recommend checking out
the same version of the source with the SDK version you installed and
trying again.

On Wed, May 13, 2020 at 2:51 PM Naveen M  wrote:

> Hi,
>
> I am trying to run sample WordCount beam job with PortableRunner by
> following the documentation here,
>
> https://beam.apache.org/documentation/runners/spark/
>
> I want to run this as a spark-submit command with YARN resource manager.
>
> Can you please let me know what is missing here? Thanks your help.
>
>
> I tried the below commands and giving some weird errors,
>
>
>
>
> spark-submit --master yarn --deploy-mode client --driver-memory 2g
> --executor-memory 1g --executor-cores 1 WordCount.py --input ""
> --output "" --runner PortableRunner --job_endpoint localhost:8099
>
>
>
>
>
>
>   File "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py",
> line 503, in __exit__
>
> self.run().wait_until_finish()
>
>   File "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py",
> line 483, in run
>
> self._options).run(False)
>
>   File "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py",
> line 496, in run
>
>return self.runner.run_pipeline(self, self._options)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 384, in run_pipeline
>
> job_service_plan.submit(proto_pipeline)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 105, in submit
>
> prepare_response.staging_session_token)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 190, in stage
>
> staging_location='')
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/stager.py",
> line 229, in stage_job_resources
>
> self.stage_artifact(pickled_session_file, staged_path)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_stager.py",
> line 98, in stage_artifact
>
> self._artifact_staging_stub.PutArtifact(artifact_request_generator())
>
>   File "/usr/local/lib64/python3.7/site-packages/grpc/_channel.py", line
> 1011, in __call__
>
> return _end_unary_response_blocking(state, call, False, None)
>
>   File "/usr/local/lib64/python3.7/site-packages/grpc/_channel.py", line
> 729, in _end_unary_response_blocking
>
> raise _InactiveRpcError(state)
>
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
> with:
>
> status = StatusCode.UNIMPLEMENTED
>
> details = "Method not found:
> org.apache.beam.model.job_management.v1.ArtifactStagingService/PutArtifact"
>
> debug_error_string =
> "{"created":"@1589406258.175447016","description":"Error received from peer
> ipv6:[::1]:8098","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Method
> not found:
> org.apache.beam.model.job_management.v1.ArtifactStagingService/PutArtifact","grpc_status":12}"
>


Re: KafkaIO.write dynamically fanout to different kafka topics

2020-05-14 Thread Eleanore Jin
Hi Alex,

Thanks a lot for the information!

Best
Eleanore

On Thu, May 14, 2020 at 1:53 AM Alexey Romanenko 
wrote:

> Hi Eleanore,
>
> Yes, to define output topic dynamically for every record, you may want to
> use KafkaIO.writeRecords() that takes PCollection as
> an input and for every processed ProducerRecord it takes its topic name (if
> it was specified there), and use it as an output topic. So, in this way,
> you can set for every record a topic name where it will be published.
>
> If I got your question right, you need to have an intermediate PTransfrom
> before KafkaIO.writeRecords(), that will use an information from a message
> field to define a topic where your record should be published, and then
> create a new ProducerRecord with a proper topic name.
>
> > On 14 May 2020, at 07:09, Eleanore Jin  wrote:
> >
> > Hi all,
> >
> > I have a beam pipeline, which will read from kafka topic via KafkaIO,
> and based on the message field, add additional field in the message for the
> destination topic.
> >
> > I see KakfaIO.write can be used to publish to kafka topics.
> >
> > In KafkaIO.java, it construct the ProducerRecord, and getTopic()
> determines which topic to publish, and this information is passed when
> create PTransforms via KafkaIO.write.
> >
> > Any suggestions to dynamically set kafka topic from message field?
> >
> > Thanks a lot!
> > Eleanore
>
>


RE: Apache beam job on Flink checkpoint size growing over time

2020-05-14 Thread Stephen.Hesketh
Hi all,

This issue is linked to the fusion break we are using to distribute the events 
from the simple single threaded, unbounded source with no data stored in the 
checkpoint.

I have recreated on a simple pipeline that:

 - generates dummy events (just holding an id) from an UnboundedSource 
(returning Collections.singletonList(this) in the split to ensure active source 
on single thread)
 - uses the fusion break attached
 - has a dummy output DoFn

The checkpoint size steadily increases when running on the FlinkRunner. The 
pipeline options are the simple command line args below.

Any suggestions would be appreciated! Does Flink not require / like fusion 
breaks?

Cheers,

Steve


--runner=flink
--flinkMaster=:
--streaming=true
--parallelism=1
--checkpointingInterval=2
--checkpointTimeoutMillis=1
--minPauseBetweenCheckpoints=1
--failOnCheckpointingErrors=false
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
--numberOfExecutionRetries=2
--executionRetryDelay=1000


Stephen Hesketh
The information classification of this email is Confidential unless otherwise 
stated. 

-Original Message-
From: Hesketh, Stephen (Technology, NatWest Markets) 
Sent: 01 May 2020 15:04
To: m...@apache.org; user@beam.apache.org
Subject: RE: Apache beam job on Flink checkpoint size growing over time


*
"This is an external email. Do you know who has sent it? Can you be sure that 
any links and attachments contained within it are safe? If in any doubt, use 
the Phishing Reporter Button in your Outlook client or forward the email as an 
attachment to ~ I've Been Phished"
*

Hi Max,

We could test  a patched BEAM Pipeline if it might help resolve the issue.

What would be involved in getting the patched version over? (We may need to get 
security approval to get it down loaded.)

Many thanks,

Steve


Stephen Hesketh
The information classification of this email is Confidential unless otherwise 
stated. 


-Original Message-
From: Maximilian Michels [mailto:m...@apache.org] 
Sent: 30 April 2020 10:17
To: Hesketh, Stephen (Technology, NatWest Markets); user@beam.apache.org
Subject: Re: Apache beam job on Flink checkpoint size growing over time


*
"This is an external email. Do you know who has sent it? Can you be sure that 
any links and attachments contained within it are safe? If in any doubt, use 
the Phishing Reporter Button in your Outlook client or forward the email as an 
attachment to ~ I've Been Phished"
*

Hey Steve,

The checkpoint buffer is cleared "lazily" when a new bundle is started.
Theoretically, if no more data was to arrive after a checkpoint, then
this buffer would not be cleared until more data arrived. If this
repeats every time new data comes in, then always some data would remain
buffered which one could observe in the checkpoint state.

We could fix this by making sure the buffer is flushed when a checkpoint
completes, instead of merely flushing it "lazily" when new data arrives.

Do you have the option to test a patched Beam version?

Best,
Max

On 29.04.20 14:34, stephen.hesk...@natwestmarkets.com wrote:
> Hi Max,
> 
> The ingestion is covering EOD processing from a Kafka source, so we get a lot 
> of data from 5pm-8pm and outside of that time we get no data. The checkpoint 
> is just storing the Kafka offset for restart.
> 
> Sounds like during the period of no data there could be an open buffer. I 
> would have thought that would be cleared soon after data starts flowing again 
> though and wouldn't lead to an increase in checkpoint size over a number of 
> days.
> 
> Unless we are missing something in BEAM and aren't actually triggering a new 
> start bundle at any point, which is why the buffer continues to grow and is 
> never flushed?
> 
> I am going to try to recreate on a very simple test pipeline.
> 
> For reference, we are using Flink 1.8.0 and Apache BEAM 2.16 at the moment.
> 
> Many thanks,
> 
> Steve
> 
> 
> Stephen Hesketh
> The information classification of this email is Confidential unless otherwise 
> stated. 
> 
> 
> -Original Message-
> From: Maximilian Michels [mailto:m...@apache.org] 
> Sent: 22 April 2020 20:38
> To: user@beam.apache.org; Hesketh, Stephen (Technology, NatWest Markets)
> Subject: Re: Apache beam job on Flink checkpoint size growing over time
> 
> 
> *
> "This is an external email. Do you know who has sent it? Can you be sure that 
> any links and attachments contained within it are safe? If in any doubt, use 
> the Phishing Reporter Button in your Outlook client or forward the email as 
> an attachment to ~ I've Been Phished"
> *
> 
> Hi Steve,
> 
> The Flink Runner buffers data as part of the checkpoint. This 

Re: Unbounded stream to FileIO.write

2020-05-14 Thread Luke Cwik
You could still use streaming mode but you need to ensure that you use a
windowing strategy with a trigger that fires based upon processing time or
# records and not event time. This wouldn't require buffering all the data
as it is being read and would help with memory issues for runners that
don't have access to effectively infinite memory or offload to disk.

On Wed, May 13, 2020 at 5:04 PM Nathan Fisher 
wrote:

> Thanks Luke!
>
> When I started using the direct runner I was getting Out of Memory errors.
> I incorrectly thought toggling on streaming would help eliminate/minimise
> those errors. I started playing with windowing not realising a bounded IO
> would treat everything as the same window even with streaming on.
>
> I switched to the Flink runner after reading that the direct runner is
> actually intended to tease out pipeline issues. I’ve been using the Flink
> WebUI to monitor progress. Which has been helpful to visualise what
> progress was made in the stream.
>
> What I would ultimately like to achieve is reading from Cassandra and
> writing the records to one or more Parquet files based on either count (Eg
> 10,000 records per file) and/or size on disk (Eg rotate to a new file when
> 64MB is exceeded).
>
> The environment I’m testing against right now is a relatively small test
> env with 100k records or so. Larger envs will be 1-100m.
>
> Regards,
> Nathan
>
> On Wed, May 13, 2020 at 19:25, Luke Cwik  wrote:
>
>> Since CassandraIO is not an unbounded source, the watermark will never
>> advance until all the data is read from Cassandraw which means that you
>> will buffer all the data if you (or any transform you use) has any event
>> time based windowing strategy when grouping. You could swap to use a
>> processing time windowing strategy (via Window.into) but it is unclear that
>> is what you want.
>>
>> CassandraIO is not an unbounded source so why do you want --streaming?
>> Also, why do you want to window.into fixed windows of 30 seconds?
>>
>>
>> On Tue, May 12, 2020 at 8:28 AM Nathan Fisher 
>> wrote:
>>
>>> Hi Folks,
>>>
>>> Cross-posting from the Slack channel from the other day.
>>>
>>> I started looking at Beam again over the weekend. I have an unbounded
>>> stream with a CassandraIO input and am trying to write files using
>>> FileIO and ParquetIO.
>>>
>>> I'm using the following:
>>>
>>> Beam: 2.20.0
>>> Flink Runner/Cluster: 1.9(.3)
>>>
>>> java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
>>> --sdkWorkerParallelism=0 --runner=FlinkRunner
>>>
>>> When submitting to a Flink cluster I include
>>> --flinkMaster=localhost:8081 in the command.
>>>
>>> If I replace the FileIO with a simple log writer it prints out the
>>> records and makes progress. Using the FileIO with ParquetIO it stalls
>>> on the stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
>>> ->
>>> write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
>>> -> write/WriteFiles/GatherTempFileResults/Add void
>>> key/AddKeys/Map/ParMultiDo(Anonymous).
>>>
>>> That brings me to ask the following questions:
>>>
>>>1. What's the best way to test and monitor a beam pipeline?
>>>2. What adjustments are required to get this pipeline writing files?
>>>3. Is there some kind of way to evaluate the DAG and identify
>>>scenarios where this stall is likely?
>>>
>>>PipelineOptions pipelineOptions = PipelineOptionsFactory
>>> .fromArgs(args)
>>> .withValidation()
>>> .create();
>>> Pipeline p = Pipeline.create(pipelineOptions);
>>> CoderRegistry registry = p.getCoderRegistry();
>>> registry.registerCoderForClass(GenericRecord.class, 
>>> AvroCoder.of(SCHEMA));PCollection metrics = p.apply("cassandra",
>>> CassandraIO.read()
>>> .withHosts(hosts)
>>> .withPort(9042)
>>> .withLocalDc("datacenter1")
>>> .withKeyspace(KEY_SPACE)
>>> .withTable(TABLE)
>>> .withMinNumberOfSplits(100)
>>> .withEntity(Metric.class)
>>> .withCoder(SerializableCoder.of(Metric.class)));
>>> metrics.apply("window",
>>> Window.into(
>>> FixedWindows.of(Duration.standardSeconds(30)))
>>> .withAllowedLateness(Duration.standardSeconds(5))
>>> .accumulatingFiredPanes())
>>> .apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG)))
>>> .apply("write", FileIO.write()
>>> .via(ParquetIO.sink(SCHEMA))
>>> .withNumShards(200)
>>> .to("./metrics/")
>>> .withPrefix("metrics")
>>> .withSuffix(".parquet"));
>>> p.run().waitUntilFinish();
>>>
>>> I also loaded this into a Flink cluster and it appears to stall on the
>>> temporary file sharding as outlined above and eventually fails after
>>> processing about 600-700k records.
>>>
>>> Rereading the windowing section in the document I changed it to

Re: Try Beam Katas Today

2020-05-14 Thread Nathan Fisher
Yes write IO

On Thu, May 14, 2020 at 05:41, Henry Suryawirawan 
wrote:

> Yeah certainly we can expand it further.
> There are more lessons that definitely can be added further.
>
> >Eg more the write side windowing interactions?
> Are you referring to Write IOs?
>
>
>
> On Wed, May 13, 2020 at 11:56 PM Nathan Fisher 
> wrote:
>
>> I went through them earlier this week! Definitely helpful.
>>
>> Is it possible to expand the katas available in the lO section? Eg more
>> the write side windowing interactions?
>>
>> On Wed, May 13, 2020 at 11:36, Luke Cwik  wrote:
>>
>>> These are an excellent learning tool.
>>>
>>> On Tue, May 12, 2020 at 11:02 PM Pablo Estrada 
>>> wrote:
>>>
 Sharing Damon's email with the user@ list as well. Thanks Damon!

 On Tue, May 12, 2020 at 9:02 PM Damon Douglas 
 wrote:

> Hello Everyone,
>
> If you don't already know, there are helpful instructional tools for
> learning the Apache Beam SDKs called Beam Katas hosted on
> https://stepik.org.  Similar to traditional Kata
> , they are meant to be repeated
> as practice.  Before practicing the katas myself, I found myself
> copy/pasting code (Please accept my confession  ).  Now I find myself
> actually composing pipelines.  Just like kata forms, you find them 
> becoming
> part of you.  If you are interested, below are listed the current 
> available
> katas:
>
> 1.  Java - https://stepik.org/course/54530
>
> 2.  Python -  https://stepik.org/course/54532
>
> 3.  Go (in development) - https://stepik.org/course/70387
>
> If you are absolutely brand new to Beam and it scares you like it
> scared me, come talk to me.
>
> Best,
>
> Damon
>
 --
>> Nathan Fisher
>>  w: http://junctionbox.ca/
>>
> --
Nathan Fisher
 w: http://junctionbox.ca/


Re: Try Beam Katas Today

2020-05-14 Thread Austin Bennett
It looks like there are instructions online for writing exercises/Katas:
https://www.jetbrains.com/help/education/educator-start-guide.html

Do we have a guide for contributing and publication/releases occur
(publishing to Stepik)?  Although the code lives in the main repo
(therefore subject to those contrib guidelines), I think the
release/publication schedule is distinct?

This hopefully will help illustrate that we are able to contribute to Katas
(PRs welcome?), and not just consume them!



On Thu, May 14, 2020 at 1:41 AM Henry Suryawirawan 
wrote:

> Yeah certainly we can expand it further.
> There are more lessons that definitely can be added further.
>
> >Eg more the write side windowing interactions?
> Are you referring to Write IOs?
>
>
>
> On Wed, May 13, 2020 at 11:56 PM Nathan Fisher 
> wrote:
>
>> I went through them earlier this week! Definitely helpful.
>>
>> Is it possible to expand the katas available in the lO section? Eg more
>> the write side windowing interactions?
>>
>> On Wed, May 13, 2020 at 11:36, Luke Cwik  wrote:
>>
>>> These are an excellent learning tool.
>>>
>>> On Tue, May 12, 2020 at 11:02 PM Pablo Estrada 
>>> wrote:
>>>
 Sharing Damon's email with the user@ list as well. Thanks Damon!

 On Tue, May 12, 2020 at 9:02 PM Damon Douglas 
 wrote:

> Hello Everyone,
>
> If you don't already know, there are helpful instructional tools for
> learning the Apache Beam SDKs called Beam Katas hosted on
> https://stepik.org.  Similar to traditional Kata
> , they are meant to be repeated
> as practice.  Before practicing the katas myself, I found myself
> copy/pasting code (Please accept my confession  ).  Now I find myself
> actually composing pipelines.  Just like kata forms, you find them 
> becoming
> part of you.  If you are interested, below are listed the current 
> available
> katas:
>
> 1.  Java - https://stepik.org/course/54530
>
> 2.  Python -  https://stepik.org/course/54532
>
> 3.  Go (in development) - https://stepik.org/course/70387
>
> If you are absolutely brand new to Beam and it scares you like it
> scared me, come talk to me.
>
> Best,
>
> Damon
>
 --
>> Nathan Fisher
>>  w: http://junctionbox.ca/
>>
>


Re: KafkaIO.write dynamically fanout to different kafka topics

2020-05-14 Thread Alexey Romanenko
Hi Eleanore,

Yes, to define output topic dynamically for every record, you may want to use 
KafkaIO.writeRecords() that takes PCollection as an input 
and for every processed ProducerRecord it takes its topic name (if it was 
specified there), and use it as an output topic. So, in this way, you can set 
for every record a topic name where it will be published. 

If I got your question right, you need to have an intermediate PTransfrom 
before KafkaIO.writeRecords(), that will use an information from a message 
field to define a topic where your record should be published, and then create 
a new ProducerRecord with a proper topic name.

> On 14 May 2020, at 07:09, Eleanore Jin  wrote:
> 
> Hi all, 
> 
> I have a beam pipeline, which will read from kafka topic via KafkaIO, and 
> based on the message field, add additional field in the message for the 
> destination topic. 
> 
> I see KakfaIO.write can be used to publish to kafka topics. 
> 
> In KafkaIO.java, it construct the ProducerRecord, and getTopic() determines 
> which topic to publish, and this information is passed when create 
> PTransforms via KafkaIO.write.
> 
> Any suggestions to dynamically set kafka topic from message field? 
> 
> Thanks a lot!
> Eleanore



Re: TextIO. Writing late files

2020-05-14 Thread Jose Manuel
Hi again,

I have simplify the example to reproduce the data loss. The scenario is the
following:

- TextIO write files.
- getPerDestinationOutputFilenames emits file names
- File names are processed by a aggregator (combine, distinct,
groupbyKey...) with a window **without allowlateness**
- File names are discarded as late

Here you can see the data loss in the picture in
https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss

Please, follow README to run the pipeline and find log traces that say data
are dropped as late.
Remember, you can run the pipeline with another window's  lateness values
(check README.md)

Kby.

El mar., 12 may. 2020 a las 17:16, Jose Manuel ()
escribió:

> Hi,
>
> I would like to clarify that while TextIO is writing every data are in the
> files (shards). The losing happens when file names emitted by
> getPerDestinationOutputFilenames are processed by a window.
>
> I have created a pipeline to reproduce the scenario in which some
> filenames are loss after the getPerDestinationOutputFilenames. Please, note
> I tried to simplify the code as much as possible, but the scenario is not
> easy to reproduce.
>
> Please check this project https://github.com/kiuby88/windowing-textio
> Check readme to build and run (
> https://github.com/kiuby88/windowing-textio#build-and-run)
> Project contains only a class with the pipeline PipelineWithTextIo,
> a log4j2.xml file in the resources and the pom.
>
> The pipeline in PipelineWithTextIo generates unbounded data using a
> sequence. It adds a little delay (10s) per data entry, it uses a distinct
> (just to apply the window), and then it writes data using TexIO.
> The windows for the distinct is fixed (5 seconds) and it does not use
> lateness.
> Generated files can be found in
> windowing-textio/pipe_with_lateness_0s/files. To write files the
> FileNamePolicy uses window + timing + shards (see
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
> )
> Files are emitted using getPerDestinationOutputFilenames()
> (see the code here,
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
> )
>
> Then, File names in the PCollection are extracted and logged. Please, note
> file names dot not have pain information in that point.
>
> To apply a window a distinct is used again. Here several files are
> discarded as late and they are not processed by this second distinct.
> Please, see
>
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>
> Debug is enabled for WindowTracing, so you can find in the terminal
> several messages as the followiing:
> DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping
> element at 2020-05-12T14:05:14.999Z for
> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
> behind inputWatermark:2020-05-12T14:05:19.799Z;
> outputWatermark:2020-05-12T14:05:19.799Z`
>
> What happen here? I think that messages are generated per second and a
> window of 5 seconds group them. Then a delay is added and finally data are
> written in a file.
> The pipeline reads more data, increasing the watermark.
> Then, file names are emitted without pane information (see "Emitted File"
> in logs). Window in second distinct compares file names' timestamp and the
> pipeline watermark and then it discards file names as late.
>
>
> Bonus
> -
> You can add a lateness to the pipeline. See
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
>
> If a minute is added a lateness for window the file names are processed as
> late. As result the traces of LateDataFilter disappear.
>
> Moreover, in order to illustrate better that file names are emitted as
> late for the second discarded I added a second TextIO to write file names
> in other files.
> Same FileNamePolicy than before was used (window + timing + shards). Then,
> you can find files that contains the original filenames in
> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
> interesting part, because you will find several files with LATE in their
> names.
>
> Please, let me know if you need more information or if the example is not
> enough to check the expected scenarios.
>
> Kby.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> El dom., 10 may. 2020 a las 17:04, Reuven Lax ()
> escribió:
>
>> Pane info is supposed to be preserved across transforms. If the Fink
>> runner does not, than I believe that is a bug.
>>
>> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek 
>> wrote:
>>
>>> I am using FileIO and I do observe the drop of pane info information on
>>> Flink runner too. It was mentioned in this thread:
>>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>>>
>>> It is a result of different 

Re: Try Beam Katas Today

2020-05-14 Thread Henry Suryawirawan
Yeah certainly we can expand it further.
There are more lessons that definitely can be added further.

>Eg more the write side windowing interactions?
Are you referring to Write IOs?



On Wed, May 13, 2020 at 11:56 PM Nathan Fisher 
wrote:

> I went through them earlier this week! Definitely helpful.
>
> Is it possible to expand the katas available in the lO section? Eg more
> the write side windowing interactions?
>
> On Wed, May 13, 2020 at 11:36, Luke Cwik  wrote:
>
>> These are an excellent learning tool.
>>
>> On Tue, May 12, 2020 at 11:02 PM Pablo Estrada 
>> wrote:
>>
>>> Sharing Damon's email with the user@ list as well. Thanks Damon!
>>>
>>> On Tue, May 12, 2020 at 9:02 PM Damon Douglas 
>>> wrote:
>>>
 Hello Everyone,

 If you don't already know, there are helpful instructional tools for
 learning the Apache Beam SDKs called Beam Katas hosted on
 https://stepik.org.  Similar to traditional Kata
 , they are meant to be repeated as
 practice.  Before practicing the katas myself, I found myself copy/pasting
 code (Please accept my confession  ).  Now I find myself actually
 composing pipelines.  Just like kata forms, you find them becoming part of
 you.  If you are interested, below are listed the current available katas:

 1.  Java - https://stepik.org/course/54530

 2.  Python -  https://stepik.org/course/54532

 3.  Go (in development) - https://stepik.org/course/70387

 If you are absolutely brand new to Beam and it scares you like it
 scared me, come talk to me.

 Best,

 Damon

>>> --
> Nathan Fisher
>  w: http://junctionbox.ca/
>


Re: Pattern for Enrichment Against Unbounded Source (Kafka)

2020-05-14 Thread Marcin Kuthan
On Mon, 4 May 2020 at 23:58, Rion Williams  wrote:

> Hi Luke,
>
> Thanks for the detailed response, it sounds like an avenue that I'd like
> to explore a bit further, although forgive me as I'm still quite new to
> Beam in general. I haven't written any stateful DoFns previously but I'd
> imagine it'd look something like this (what you are proposing that is):
>
> ```
> val pipeline = Pipeline.create(options)
>
> // Users
> val users = pipeline.apply("Read Users from Kafka",
> KafkaIO.read(options.usersTopic, options))
>
> // Additional entities omitted for brevity here
>
> pipeline
> .apply("Read Events from Kafka",
> KafkaIO.read(options.identifiedEventsTopic, options))
> .apply(Window.into(/* Unsure what to put here? */ ))
> .apply("Enrich Event for Users", ParDo.of(
> object: DoFn, KV>(){
> @StateId("user")
> private val state:
> SomeObjectToStoreStateAboutUsersAndOrEvents()
>
> @ProcessElement()
> fun processElement(context: ProcessContext,
> @StateId("user") userStateObject:
> SomeObjectToStoreStateAboutUsersAndOrEvents) {
> // Evaluate the incoming event
>
> // Enrich if we have that user
>
> // If enriched then output
>
> // Otherwise, store in state
> }
> }
> ))
> .apply("Eventually Write Enriched Events to Kafka",
> KafkaIO.write(options.enrichedEventsTopic, options))
> ```
>
>
Presented stateful DoFns is oversimplified. The state for users is needed
but also state for events if the related user has not been observed yet (or
you could use window for events before stateful down).
Without window (e.g. in global window) the stateful DoFn requires some
trigger to clean the state as well.

Please look at this example:
https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/ScreenGlobalWindowWithLookupCacheEnricher.scala

Stream with screen events is similar to your event stream, the publication
stream is similar to user stream. Screen and publication are joined in
global window (with count at least one trigger) using stateful DoFn:
https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala

I hope this helps :)


I'm not totally sure on the windowing usage yet or how/when the other
> streams come into play, so any advice there would be useful. Additionally -
> I have a few other questions if you have the time:
>
> - In this particular pipeline, users is a single entity, however I may
> potentially have multiple others. I'm assuming this would just require an
> additional stateful function per entity?
>

Look at mentioned LookupCacheDoFn, the implementation is fully generic and
might be reused for different entities. For sure, separate stateful DoFn
instance for each entity type is required.


> - Some events may contain multiple user instances and thus require to be
> enriched multiple times from a single source. Is this a problem using this
> approach?
>
- The current plan for this in a production environment would be to rely on
> Flink. I noticed that Flink has "partial" support for handling state, would
> this fall under that supported umbrella?
>
> Thanks so much for the advice Luke, I greatly appreciate it. Sorry for
> such a long winded question, I'm really excited about working with Beam,
> but the learning curve has been pretty steep (coming from an all-Kafka
> Kafka Streams world) thus far.
>
> Rion
>
> On 2020/05/04 16:11:59, Luke Cwik  wrote:
> > You can shard the side input based upon some prefix of the key, (e.g
> first
> > byte of the key) into X shards allowing each side input to be smaller for
> > runners that don't work well with map/multimap side inputs. You should
> also
> > take a look at the side input patterns[1] since they cover slowly
> changing
> > side inputs and I believe your use case has come up in the past so going
> > through these mailing threads[2] might help as well.
> >
> > Finally, you could also use a stateful DoFn instead of the side input.
> >
> > The stateful DoFn graph would create a single "object" type that
> contained
> > either an "update" message or a "needsEnrichment" message. Something
> like:
> > MainKafkaStream ---> Flatten -> Window.into(???) ->
> > StatefulDoFn(EnrichFn) -> ...
> > AdditionalInfoStream -/
> >
> > You need to ensure that the key you use for the stateful DoFn is the same
> > key for both the "update" and "needsEnrichment" message that you would
> join
> > on. This might require multiple enrichment fns if there isn't a single
> key
> > you can join on.
> >
> > The Window.into that is before the stateful DoFn would control when
> > "events" are allowed to progress. If you want to have them "synchronized"
> > on event time then you could use the default event time trigger which
> would
> > mean that update messages wouldn't be allowed to