Re: Unbounded stream to FileIO.write

2020-05-14 Thread Nathan Fisher
Awesome thanks Luke! I mirrored a subset of the data (20k records) to a
local C* node and stripped out all of the windowing. It seems to be working
fine in batch mode. I think it was the confluence of my inexperience with
Beam and network latency that ultimately had me pulling out my hair.

I started using the Direct Runner. Is there any guidance for what scale
you'd advise against using it? To be honest the local flink runner was so
easy I'm surprised it's not the default in the getting started
documentation.

On Thu, May 14, 2020 at 1:31 PM Luke Cwik  wrote:

> You could still use streaming mode but you need to ensure that you use a
> windowing strategy with a trigger that fires based upon processing time or
> # records and not event time. This wouldn't require buffering all the data
> as it is being read and would help with memory issues for runners that
> don't have access to effectively infinite memory or offload to disk.
>
> On Wed, May 13, 2020 at 5:04 PM Nathan Fisher 
> wrote:
>
>> Thanks Luke!
>>
>> When I started using the direct runner I was getting Out of Memory
>> errors. I incorrectly thought toggling on streaming would help
>> eliminate/minimise those errors. I started playing with windowing not
>> realising a bounded IO would treat everything as the same window even with
>> streaming on.
>>
>> I switched to the Flink runner after reading that the direct runner is
>> actually intended to tease out pipeline issues. I’ve been using the Flink
>> WebUI to monitor progress. Which has been helpful to visualise what
>> progress was made in the stream.
>>
>> What I would ultimately like to achieve is reading from Cassandra and
>> writing the records to one or more Parquet files based on either count (Eg
>> 10,000 records per file) and/or size on disk (Eg rotate to a new file when
>> 64MB is exceeded).
>>
>> The environment I’m testing against right now is a relatively small test
>> env with 100k records or so. Larger envs will be 1-100m.
>>
>> Regards,
>> Nathan
>>
>> On Wed, May 13, 2020 at 19:25, Luke Cwik  wrote:
>>
>>> Since CassandraIO is not an unbounded source, the watermark will never
>>> advance until all the data is read from Cassandraw which means that you
>>> will buffer all the data if you (or any transform you use) has any event
>>> time based windowing strategy when grouping. You could swap to use a
>>> processing time windowing strategy (via Window.into) but it is unclear that
>>> is what you want.
>>>
>>> CassandraIO is not an unbounded source so why do you want --streaming?
>>> Also, why do you want to window.into fixed windows of 30 seconds?
>>>
>>>
>>> On Tue, May 12, 2020 at 8:28 AM Nathan Fisher 
>>> wrote:
>>>
>>>> Hi Folks,
>>>>
>>>> Cross-posting from the Slack channel from the other day.
>>>>
>>>> I started looking at Beam again over the weekend. I have an unbounded
>>>> stream with a CassandraIO input and am trying to write files using
>>>> FileIO and ParquetIO.
>>>>
>>>> I'm using the following:
>>>>
>>>> Beam: 2.20.0
>>>> Flink Runner/Cluster: 1.9(.3)
>>>>
>>>> java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
>>>> --sdkWorkerParallelism=0 --runner=FlinkRunner
>>>>
>>>> When submitting to a Flink cluster I include
>>>> --flinkMaster=localhost:8081 in the command.
>>>>
>>>> If I replace the FileIO with a simple log writer it prints out the
>>>> records and makes progress. Using the FileIO with ParquetIO it stalls
>>>> on the stage 
>>>> write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
>>>> ->
>>>> write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
>>>> -> write/WriteFiles/GatherTempFileResults/Add void
>>>> key/AddKeys/Map/ParMultiDo(Anonymous).
>>>>
>>>> That brings me to ask the following questions:
>>>>
>>>>1. What's the best way to test and monitor a beam pipeline?
>>>>2. What adjustments are required to get this pipeline writing files?
>>>>3. Is there some kind of way to evaluate the DAG and identify
>>>>scenarios where this stall is likely?
>>>>
>>>>PipelineOptions pipelineOptions = PipelineOptionsFactory
>>>> .fromArgs(args)
>>>> .withValidation()
&g

Re: Try Beam Katas Today

2020-05-14 Thread Nathan Fisher
Yes write IO

On Thu, May 14, 2020 at 05:41, Henry Suryawirawan 
wrote:

> Yeah certainly we can expand it further.
> There are more lessons that definitely can be added further.
>
> >Eg more the write side windowing interactions?
> Are you referring to Write IOs?
>
>
>
> On Wed, May 13, 2020 at 11:56 PM Nathan Fisher 
> wrote:
>
>> I went through them earlier this week! Definitely helpful.
>>
>> Is it possible to expand the katas available in the lO section? Eg more
>> the write side windowing interactions?
>>
>> On Wed, May 13, 2020 at 11:36, Luke Cwik  wrote:
>>
>>> These are an excellent learning tool.
>>>
>>> On Tue, May 12, 2020 at 11:02 PM Pablo Estrada 
>>> wrote:
>>>
>>>> Sharing Damon's email with the user@ list as well. Thanks Damon!
>>>>
>>>> On Tue, May 12, 2020 at 9:02 PM Damon Douglas 
>>>> wrote:
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> If you don't already know, there are helpful instructional tools for
>>>>> learning the Apache Beam SDKs called Beam Katas hosted on
>>>>> https://stepik.org.  Similar to traditional Kata
>>>>> <https://en.wikipedia.org/wiki/Kata>, they are meant to be repeated
>>>>> as practice.  Before practicing the katas myself, I found myself
>>>>> copy/pasting code (Please accept my confession  ).  Now I find myself
>>>>> actually composing pipelines.  Just like kata forms, you find them 
>>>>> becoming
>>>>> part of you.  If you are interested, below are listed the current 
>>>>> available
>>>>> katas:
>>>>>
>>>>> 1.  Java - https://stepik.org/course/54530
>>>>>
>>>>> 2.  Python -  https://stepik.org/course/54532
>>>>>
>>>>> 3.  Go (in development) - https://stepik.org/course/70387
>>>>>
>>>>> If you are absolutely brand new to Beam and it scares you like it
>>>>> scared me, come talk to me.
>>>>>
>>>>> Best,
>>>>>
>>>>> Damon
>>>>>
>>>> --
>> Nathan Fisher
>>  w: http://junctionbox.ca/
>>
> --
Nathan Fisher
 w: http://junctionbox.ca/


Re: Unbounded stream to FileIO.write

2020-05-13 Thread Nathan Fisher
Thanks Luke!

When I started using the direct runner I was getting Out of Memory errors.
I incorrectly thought toggling on streaming would help eliminate/minimise
those errors. I started playing with windowing not realising a bounded IO
would treat everything as the same window even with streaming on.

I switched to the Flink runner after reading that the direct runner is
actually intended to tease out pipeline issues. I’ve been using the Flink
WebUI to monitor progress. Which has been helpful to visualise what
progress was made in the stream.

What I would ultimately like to achieve is reading from Cassandra and
writing the records to one or more Parquet files based on either count (Eg
10,000 records per file) and/or size on disk (Eg rotate to a new file when
64MB is exceeded).

The environment I’m testing against right now is a relatively small test
env with 100k records or so. Larger envs will be 1-100m.

Regards,
Nathan

On Wed, May 13, 2020 at 19:25, Luke Cwik  wrote:

> Since CassandraIO is not an unbounded source, the watermark will never
> advance until all the data is read from Cassandraw which means that you
> will buffer all the data if you (or any transform you use) has any event
> time based windowing strategy when grouping. You could swap to use a
> processing time windowing strategy (via Window.into) but it is unclear that
> is what you want.
>
> CassandraIO is not an unbounded source so why do you want --streaming?
> Also, why do you want to window.into fixed windows of 30 seconds?
>
>
> On Tue, May 12, 2020 at 8:28 AM Nathan Fisher 
> wrote:
>
>> Hi Folks,
>>
>> Cross-posting from the Slack channel from the other day.
>>
>> I started looking at Beam again over the weekend. I have an unbounded
>> stream with a CassandraIO input and am trying to write files using FileIO
>> and ParquetIO.
>>
>> I'm using the following:
>>
>> Beam: 2.20.0
>> Flink Runner/Cluster: 1.9(.3)
>>
>> java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
>> --sdkWorkerParallelism=0 --runner=FlinkRunner
>>
>> When submitting to a Flink cluster I include --flinkMaster=localhost:8081
>> in the command.
>>
>> If I replace the FileIO with a simple log writer it prints out the
>> records and makes progress. Using the FileIO with ParquetIO it stalls on
>> the stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
>> ->
>> write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
>> -> write/WriteFiles/GatherTempFileResults/Add void
>> key/AddKeys/Map/ParMultiDo(Anonymous).
>>
>> That brings me to ask the following questions:
>>
>>1. What's the best way to test and monitor a beam pipeline?
>>2. What adjustments are required to get this pipeline writing files?
>>3. Is there some kind of way to evaluate the DAG and identify
>>scenarios where this stall is likely?
>>
>>PipelineOptions pipelineOptions = PipelineOptionsFactory
>> .fromArgs(args)
>> .withValidation()
>> .create();
>> Pipeline p = Pipeline.create(pipelineOptions);
>> CoderRegistry registry = p.getCoderRegistry();
>> registry.registerCoderForClass(GenericRecord.class, 
>> AvroCoder.of(SCHEMA));PCollection metrics = p.apply("cassandra",
>> CassandraIO.read()
>> .withHosts(hosts)
>> .withPort(9042)
>> .withLocalDc("datacenter1")
>> .withKeyspace(KEY_SPACE)
>> .withTable(TABLE)
>> .withMinNumberOfSplits(100)
>> .withEntity(Metric.class)
>> .withCoder(SerializableCoder.of(Metric.class)));
>> metrics.apply("window",
>> Window.into(
>> FixedWindows.of(Duration.standardSeconds(30)))
>> .withAllowedLateness(Duration.standardSeconds(5))
>> .accumulatingFiredPanes())
>> .apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG)))
>> .apply("write", FileIO.write()
>> .via(ParquetIO.sink(SCHEMA))
>> .withNumShards(200)
>> .to("./metrics/")
>> .withPrefix("metrics")
>>     .withSuffix(".parquet"));
>>     p.run().waitUntilFinish();
>>
>> I also loaded this into a Flink cluster and it appears to stall on the
>> temporary file sharding as outlined above and eventually fails after
>> processing about 600-700k records.
>>
>> Rereading the windowing section in the document I changed it to
>> discardFiredPanes() as it seems the more appropriate behaviour for what I
>> want but that doesn't appear to have changed the results any.
>>
>> Regards,
>> --
>> Nathan Fisher
>>  w: http://junctionbox.ca/
>>
> --
Nathan Fisher
 w: http://junctionbox.ca/


Unbounded stream to FileIO.write

2020-05-12 Thread Nathan Fisher
Hi Folks,

Cross-posting from the Slack channel from the other day.

I started looking at Beam again over the weekend. I have an unbounded
stream with a CassandraIO input and am trying to write files using FileIO
and ParquetIO.

I'm using the following:

Beam: 2.20.0
Flink Runner/Cluster: 1.9(.3)

java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
--sdkWorkerParallelism=0 --runner=FlinkRunner

When submitting to a Flink cluster I include --flinkMaster=localhost:8081
in the command.

If I replace the FileIO with a simple log writer it prints out the records
and makes progress. Using the FileIO with ParquetIO it stalls on the
stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
->
write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous).

That brings me to ask the following questions:

   1. What's the best way to test and monitor a beam pipeline?
   2. What adjustments are required to get this pipeline writing files?
   3. Is there some kind of way to evaluate the DAG and identify scenarios
   where this stall is likely?

   PipelineOptions pipelineOptions = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.create();
Pipeline p = Pipeline.create(pipelineOptions);
CoderRegistry registry = p.getCoderRegistry();
registry.registerCoderForClass(GenericRecord.class,
AvroCoder.of(SCHEMA));PCollection metrics =
p.apply("cassandra",
CassandraIO.read()
.withHosts(hosts)
.withPort(9042)
.withLocalDc("datacenter1")
.withKeyspace(KEY_SPACE)
.withTable(TABLE)
.withMinNumberOfSplits(100)
.withEntity(Metric.class)
.withCoder(SerializableCoder.of(Metric.class)));
metrics.apply("window",
Window.into(
FixedWindows.of(Duration.standardSeconds(30)))
.withAllowedLateness(Duration.standardSeconds(5))
.accumulatingFiredPanes())
.apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG)))
.apply("write", FileIO.write()
.via(ParquetIO.sink(SCHEMA))
.withNumShards(200)
.to("./metrics/")
.withPrefix("metrics")
.withSuffix(".parquet"));
p.run().waitUntilFinish();

I also loaded this into a Flink cluster and it appears to stall on the
temporary file sharding as outlined above and eventually fails after
processing about 600-700k records.

Rereading the windowing section in the document I changed it to
discardFiredPanes() as it seems the more appropriate behaviour for what I
want but that doesn't appear to have changed the results any.

Regards,
-- 
Nathan Fisher
 w: http://junctionbox.ca/