Re: Achieving MapState Equivalent in Dataflow Runner

2020-02-14 Thread Ziyad Muhammed
Thanks Luke! I will try that.

Best
Ziyad


On Thu, Feb 13, 2020 at 6:16 PM Luke Cwik  wrote:

> If the map/set can fit in memory then you can use a value state containing
> a Java Map/Set.
>
> On Thu, Feb 13, 2020 at 5:05 AM Ziyad Muhammed  wrote:
>
>> Hi All
>>
>> I'm developing a beam pipeline to be run on dataflow.
>>
>> I see that MapState and SetState are both not supported by Dataflow
>> runner yet. For my use case, I need to have a state per UserID that can
>> hold multiple events, which I would like to organize by an eventID or a
>> timestamp or a combination of both.
>>
>> What is the best way to achieve this per user state holding multiple
>> events that I can lookup by a second ID? Is there a better way than using a
>> BagState and iterating each time?
>>
>>
>> Best
>> Ziyad
>>
>


Achieving MapState Equivalent in Dataflow Runner

2020-02-13 Thread Ziyad Muhammed
Hi All

I'm developing a beam pipeline to be run on dataflow.

I see that MapState and SetState are both not supported by Dataflow runner
yet. For my use case, I need to have a state per UserID that can hold
multiple events, which I would like to organize by an eventID or a
timestamp or a combination of both.

What is the best way to achieve this per user state holding multiple events
that I can lookup by a second ID? Is there a better way than using a
BagState and iterating each time?


Best
Ziyad


Re: AvroIO Windowed Writes - Number of files to specify

2019-09-12 Thread Ziyad Muhammed
Hi Cham

Any update on this?

Best
Ziyad


On Thu, Sep 5, 2019 at 5:43 PM Ziyad Muhammed  wrote:

> Hi Cham
>
> I tried that before. Apparently it's not accepted by either direct runner
> or dataflow runner. I get the below error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: When
>> applying WriteFiles to an unbounded PCollection, must specify number of
>> output shards explicitly
>> at
>> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>
>
>
>
> Best
> Ziyad
>
>
> On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath 
> wrote:
>
>> Do you mean the value to specify for number of shards to write [1] ?
>>
>> For this I think it's better to not specify any value which will give the
>> runner the most flexibility.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>>
>> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed  wrote:
>>
>>> Hi all
>>>
>>> I have a beam pipeline running with cloud dataflow that produces avro
>>> files on GCS. Window duration is 1 minute and currently the job is running
>>> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
>>> 2GB.
>>>
>>> Is there any recommendation on the number of avro files to specify?
>>> Currently I'm using 64 (to match with the number of cores). Will a very
>>> high number help in increasing the write throughput?
>>> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
>>> files.
>>>
>>> I tried some random values, but couldn't infer a pattern when is it more
>>> performant.
>>>
>>> Any suggestion is hugely appreciated.
>>>
>>> Best
>>> Ziyad
>>>
>>


Re: AvroIO Windowed Writes - Number of files to specify

2019-09-05 Thread Ziyad Muhammed
Hi Cham

I tried that before. Apparently it's not accepted by either direct runner
or dataflow runner. I get the below error:

Exception in thread "main" java.lang.IllegalArgumentException: When
> applying WriteFiles to an unbounded PCollection, must specify number of
> output shards explicitly
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>



Best
Ziyad


On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath 
wrote:

> Do you mean the value to specify for number of shards to write [1] ?
>
> For this I think it's better to not specify any value which will give the
> runner the most flexibility.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>
> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed  wrote:
>
>> Hi all
>>
>> I have a beam pipeline running with cloud dataflow that produces avro
>> files on GCS. Window duration is 1 minute and currently the job is running
>> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
>> 2GB.
>>
>> Is there any recommendation on the number of avro files to specify?
>> Currently I'm using 64 (to match with the number of cores). Will a very
>> high number help in increasing the write throughput?
>> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
>> files.
>>
>> I tried some random values, but couldn't infer a pattern when is it more
>> performant.
>>
>> Any suggestion is hugely appreciated.
>>
>> Best
>> Ziyad
>>
>


AvroIO Windowed Writes - Number of files to specify

2019-09-04 Thread Ziyad Muhammed
Hi all

I have a beam pipeline running with cloud dataflow that produces avro files
on GCS. Window duration is 1 minute and currently the job is running with
64 cores (16 * n1-standard-4). Per minute the data produced is around 2GB.

Is there any recommendation on the number of avro files to specify?
Currently I'm using 64 (to match with the number of cores). Will a very
high number help in increasing the write throughput?
I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
files.

I tried some random values, but couldn't infer a pattern when is it more
performant.

Any suggestion is hugely appreciated.

Best
Ziyad


Re: Cost efficient loading of Kafka high throughput event stream to Bigquery

2019-08-13 Thread Ziyad Muhammed
(Better late than never) Thank you Jeff !

I could manage the throughput for Kafka to Avro files on GCS by changing
the worker disk type to SSD, disk size of 100 GB was enough.
Even with these changes, couldn't manage the throughput for direct writes
to Bigquery using FILE_LOADS.

Best
Ziyad


On Fri, May 17, 2019 at 3:24 PM Jeff Klukas  wrote:

> I have also wrestled with throughput for FileIO and BigQueryIO on
> Dataflow, and in my case the bottleneck came down to disk I/O throughput on
> the worker machines. Writing with FileIO or BigQueryIO involves several
> group by key operations that in the Dataflow case require checkpointing
> state to disk.
>
> Moving to SSD on the workers ended up being much more important in our
> case vs. tuning sharding and windowing. Windowing of 1, 5, or 10 minutes
> all sounds reasonable and I don't expect will have a big impact on observed
> performance, since I believe Dataflow is checkpointing to disk at the
> bundle level and it's not necessary that the whole window fit in memory.
> You should have numShards at least as high as your number of workers.
>
> Try SSD (workerDiskType='
> compute.googleapis.com/projects//zones//diskTypes/pd-ssd') and
> diskSizeGb=500. Last I checked, 500 GB maximized the disk I/O per worker
> per Dataflow docs.
>
> On Fri, May 17, 2019 at 7:07 AM Ziyad Muhammed  wrote:
>
>> Hi,
>>
>> I have a kafka event stream that produces 80k messages per second.
>> Messages are in protobuf format and is roughly 800 bytes in size. I need
>> these event data to be loaded in a Bigquery table using Beam/Dataflow.
>>
>> The choice of streaming inserts was discarded due to the high cost (for
>> the above mentioned throughput, the insertion cost itself was estimated to
>> be ~9k $)
>>
>> The two other options are to use FILE_LOADS api to directly load the data
>> as TableRows or write as avro files to GCS and do a scheduled load to
>> Bigquery (for example, using airflow)
>>
>> I tried both options, but couldn't get the desired throughput with upto
>> 64 worker machine cores (e.g. *16 * n1-standard-4* -or- *64 *
>> n1-standard-1 *etc.|). The kafka topic has 16 partitions.
>>
>> For FILE_LOADS, I tried with numshards as 1000 or more and triggering
>> frequency of 2, 5 or 10 mins.
>> For AvroIO, I'm missing the recommended values for input parameters.
>> 1. what window duration should I use? (I was using 1 min window, not to
>> have too much elements in memory. Latency of the pipeline is not a big
>> concern)
>> 2. what is the recommended value of number of shards? (num_shards = 0 so
>> that system decides, didnt work for dataflow runner)
>> 3. should I customize gcs upload buffer size?
>> 4. I was choosing the number of worker nodes proportional to the number
>> of kafka topics. Is that the right approach? which kind of machines and how
>> many are suitable for this use case?
>>
>> And in general, is it at all possible to have this ingest to Bigquery in
>> any batch fasion with a less cost than streaming inserts?
>>
>> Any inputs are highly appreciated!
>>
>> Best
>> Ziyad
>>
>


Cost efficient loading of Kafka high throughput event stream to Bigquery

2019-05-17 Thread Ziyad Muhammed
Hi,

I have a kafka event stream that produces 80k messages per second. Messages
are in protobuf format and is roughly 800 bytes in size. I need these event
data to be loaded in a Bigquery table using Beam/Dataflow.

The choice of streaming inserts was discarded due to the high cost (for the
above mentioned throughput, the insertion cost itself was estimated to be
~9k $)

The two other options are to use FILE_LOADS api to directly load the data
as TableRows or write as avro files to GCS and do a scheduled load to
Bigquery (for example, using airflow)

I tried both options, but couldn't get the desired throughput with upto 64
worker machine cores (e.g. *16 * n1-standard-4* -or- *64 *
n1-standard-1 *etc.|).
The kafka topic has 16 partitions.

For FILE_LOADS, I tried with numshards as 1000 or more and triggering
frequency of 2, 5 or 10 mins.
For AvroIO, I'm missing the recommended values for input parameters.
1. what window duration should I use? (I was using 1 min window, not to
have too much elements in memory. Latency of the pipeline is not a big
concern)
2. what is the recommended value of number of shards? (num_shards = 0 so
that system decides, didnt work for dataflow runner)
3. should I customize gcs upload buffer size?
4. I was choosing the number of worker nodes proportional to the number of
kafka topics. Is that the right approach? which kind of machines and how
many are suitable for this use case?

And in general, is it at all possible to have this ingest to Bigquery in
any batch fasion with a less cost than streaming inserts?

Any inputs are highly appreciated!

Best
Ziyad


Re: Advice for piping many CSVs with different columns names to one bigQuery table

2018-09-28 Thread Ziyad Muhammed
Hi Eila,

I'm not sure if I understand the complexity of your problem.
If you do not have to perform any transformation on the data inside CSVs
and just need to load them to Bigquery, isn't it enough to use bqload with
schema autodetect ?
https://cloud.google.com/bigquery/docs/schema-detect

Best
Ziyad


On Thu, Sep 27, 2018 at 9:35 PM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Thank you!
> Probably around 50.
>
> Best,
> Eila
>
> On Thu, Sep 27, 2018 at 1:23 AM Ankur Goenka  wrote:
>
>> Hi Eila,
>>
>> That seems reasonable to me.
>>
>> Here is a reference on writing to BQ
>> https://github.com/apache/beam/blob/1ffba44f7459307f5a134b8f4ea47ddc5ca8affc/sdks/python/apache_beam/examples/complete/game/leader_board.py#L326
>>
>> May I know how many distinct column are you expecting across all files?
>>
>>
>> On Wed, Sep 26, 2018 at 8:06 PM OrielResearch Eila Arich-Landkof <
>> e...@orielresearch.org> wrote:
>>
>>> Hi Ankur / users,
>>>
>>> I would like to make sure that the suggested pipeline can work for my
>>> needs.
>>>
>>> So, additional clarification:
>>> - The CSV files have few common and few different columns. Each CSV file
>>> represent a sample measurements record.
>>>
>>> - When the CSVs merged together, I expect to have one table with
>>> combined columns from all samples. Will your suggested pipeline allow that?
>>>
>>> - If I understand correctly the following pipeline:
>>>
>>> *Read files => Parse lines => Generate pCollections for each column =>
>>> GroupBy column name => Write to BQ*
>>>
>>> *Read files:* will generate a pCollection from each CSV line (=file)
>>> *Parse Lines*: will extract the column name and its matching value
>>> (generate a tuple)
>>> *Generate pCollection for each column:* will generate a pCollection
>>> from the tuple
>>> *GroupBy: *will merge each column name with all the relevant samples
>>> values (does it need to know the column names to group by or will it
>>> automatically use the tuple key? )
>>> *WriteToBQ*: will NOT (is that true?) be able to write the values
>>> matching the relevant sample. The samples that didnt have value for a
>>> specific key will get values for other samples values that are available at
>>> the column's collection record
>>>
>>> - If the above understanding correct, will extracting & merging the
>>> headers from all 2.5M CSV files in advance to derived the schema and then
>>> using an additional pipeline to populate the data to BQ with schema will be
>>> the "right" approach?
>>>
>>> Please let me know if I miss something here / what your thoughts are
>>> Many thanks,
>>> Eila
>>>
>>> On Wed, Sep 26, 2018 at 12:04 PM OrielResearch Eila Arich-Landkof <
>>> e...@orielresearch.org> wrote:
>>>
 Hi Ankur,

 Thank you. Trying this approach now. Will let you know if I have any
 issue implementing it.
 Best,
 Eila

 On Tue, Sep 25, 2018 at 7:19 PM Ankur Goenka  wrote:

> Hi Eila,
>
> If I understand correctly, the objective is to read a large number of
> CSV files, each of which contains a single row with multiple columns.
> Deduplicate the columns in the file and write them to BQ.
> You are using pandas DF to deduplicate the columns for a small set of
> files which might not work for large number of files.
>
> You can use beam groupBy to deduplicate the columns and write them to
> bigquery. Beam is capable of reading and managing large number of files by
> providing path to the directory containing those files.
> So the approach would be ->
> Read files => Parse lines => Generate pCollections for each column =>
> GroupBy column name => Write to BQ
> For reference here is an example of reading file and doing groupby
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py
>
> Note: I am not very familiar with BQ so can't think of any direct
> approach to dump data to BQ.
>
> Thanks,
> Ankur
>
>
> On Tue, Sep 25, 2018 at 12:13 PM OrielResearch Eila Arich-Landkof <
> e...@orielresearch.org> wrote:
>
>> Hello,
>> I would like to write large number of CSV file to BQ where the
>> headers from all of them is aggregated to one common headers. any advice 
>> is
>> very appreciated.
>>
>> The details are:
>> 1. 2.5M CSV files
>> 2. Each CSV file: header of 50-60 columns
>> 2. Each CSV file: one data row
>>
>> there are common columns between the CSV file but I dont know them in
>> advance.I would like to have all the csv files in one bigQuery table.
>>
>> My current method:
>> When it was smaller amount of files, I read the csv files and
>> appended them to one pandas dataframe that was written to a file
>> (total.csv). total.csv was the input to the beam pipeline.
>>
>> small CSVs => Pandas DF => total CSV => pCollection => Big Query
>>
>> The challenge with that 

Re: Regarding Beam Slack Channel

2017-12-01 Thread Ziyad Muhammed
me too, thanks in advance

Best
Ziyad

On Fri, Dec 1, 2017 at 10:54 AM,  wrote:

> Hi
> Can I receive this invitation, too?
>
> Thanks
> Rick
>
> -Original Message-
> From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
> Sent: Friday, December 01, 2017 12:53 PM
> To: user@beam.apache.org
> Subject: Re: Regarding Beam Slack Channel
>
> Invite sent as well.
>
> Regards
> JB
>
> On 11/30/2017 07:19 PM, Yanael Barbier wrote:
> > Hello
> > Can I get an invite too?
> >
> > Thanks,
> > Yanael
> >
> > Le jeu. 30 nov. 2017 à 19:15, Wesley Tanaka  > > a écrit :
> >
> > Invite sent
> >
> >
> > On 11/30/2017 08:11 AM, Nalseez Duke wrote:
> >> Hello
> >>
> >> Can someone please add me to the Beam slack channel?
> >>
> >> Thanks.
> >
> >
> > --
> > Wesley Tanaka
> > https://wtanaka.com/
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>