Re: Batch loads in streaming pipeline - withNumFileShards

2017-11-15 Thread Lukasz Cwik
Filed https://issues.apache.org/jira/browse/BEAM-3198 for the
IllegalArgumentException
Do you mind posting a little code snippet of how you build the BQ IO
connector on BEAM-3198?




On Wed, Nov 15, 2017 at 12:18 PM, Arpan Jain  wrote:

> Hi,
>
> I am trying to use Method.FILE_LOADS for loading data into BQ in my
> streaming pipeline using RC3 release of 2.2.2. It looks
> like withNumFileShards needs to be also set for using this. Couple of
> questions regarding this:
>
> * Any guidelines on what's a good value for this? FWIW my pipeline is
> going to write > 100K messages/second to around 500 tables
> using DynamicDestinations. I am trying to set a triggering frequency of 30
> minutes or 1 hour.
>
> * Can we throw a better error message when it is not set? The exception
> thrown by the pipeline if this is not set is "Exception in thread "main"
> java.lang.IllegalArgumentException" without any message.
>


Batch loads in streaming pipeline - withNumFileShards

2017-11-15 Thread Arpan Jain
Hi,

I am trying to use Method.FILE_LOADS for loading data into BQ in my
streaming pipeline using RC3 release of 2.2.2. It looks
like withNumFileShards needs to be also set for using this. Couple of
questions regarding this:

* Any guidelines on what's a good value for this? FWIW my pipeline is going
to write > 100K messages/second to around 500 tables
using DynamicDestinations. I am trying to set a triggering frequency of 30
minutes or 1 hour.

* Can we throw a better error message when it is not set? The exception
thrown by the pipeline if this is not set is "Exception in thread "main"
java.lang.IllegalArgumentException" without any message.


Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Tim Robertson
Hi Chet,

I'll be a user of this, so thank you.

It seems reasonable although - did you consider letting folk name the
document ID field explicitly?  It would avoid an unnecessary transformation
and might be simpler:


  // instruct the writer to use a provided document ID

  
ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE).withDocumentIdField("myID");


On Wed, Nov 15, 2017 at 8:08 PM, Chet Aldrich 
wrote:

> Given that this seems like a change that should probably happen, and I’d
> like to help contribute if possible, a few questions and my current
> opinion:
>
> So I’m leaning towards approach B here, which is:
>
> b. (a bit less user friendly) PCollection with K as an id. But forces
> the user to do a Pardo before writing to ES to output KV pairs of 
>
> I think that the reduction in user-friendliness may be outweighed by the
> fact that this obviates some of the issues surrounding a failure when
> finishing a bundle. Additionally, this *forces* the user to provide a
> document id, which I think is probably better practice. This will also
> probably lead to fewer frustrations around “magic” code that just pulls
> something in if it happens to be there, and doesn’t if not. We’ll need to
> rely on the user catching this functionality in the docs or the code itself
> to take advantage of it.
> IMHO it’d be generally better to enforce this at compile time because it
> does have an effect on whether the pipeline produces duplicates on failure.
> Additionally, we get the benefit of relatively intuitive behavior where if
> the user passes in the same Key value, it’ll update a record in ES, and if
> the key is different then it will create a new record.
>
> Curious to hear thoughts on this. If this seems reasonable I’ll go ahead
> and create a JIRA for tracking and start working on a PR for this. Also, if
> it’d be good to loop in the dev mailing list before starting let me know,
> I’m pretty new to this.
>
> Chet
>
> On Nov 15, 2017, at 12:53 AM, Etienne Chauchot 
> wrote:
>
> Hi Chet,
>
> What you say is totally true, docs written using ElasticSearchIO will
> always have an ES generated id. But it might change in the future, indeed
> it might be a good thing to allow the user to pass an id. Just in 5 seconds
> thinking, I see 3 possible designs for that.
>
> a.(simplest) use a json special field for the id, if it is provided by the
> user in the input json then it is used, auto-generated id otherwise.
>
> b. (a bit less user friendly) PCollection with K as an id. But forces
> the user to do a Pardo before writing to ES to output KV pairs of 
>
> c. (a lot more complex) Allow the IO to serialize/deserialize java beans
> and have an String id field. Matching java types to ES types is quite
> tricky, so, for now we just relied on the user to serialize his beans into
> json and let ES match the types automatically.
>
> Related to the problems you raise bellow:
>
> 1. Well, the bundle is the commit entity of beam. Consider the case of
> ESIO.batchSize being < to bundle size. While processing records, when the
> number of elements reaches batchSize, an ES bulk insert will be issued but
> no finishBundle. If there is a problem later on in the bundle processing
> before the finishBundle, the checkpoint will still be at the beginning of
> the bundle, so all the bundle will be retried leading to duplicate
> documents. Thanks for raising that! I'm CCing the dev list so that someone
> could correct me on the checkpointing mecanism if I'm missing something.
> Besides I'm thinking about forcing the user to provide an id in all cases
> to workaround this issue.
> 2. Correct.
>
> Best,
> Etienne
>
> Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
>
> Hello all!
>
> So I’ve been using the ElasticSearchIO sink for a project (unfortunately
> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC)
> and I’m finding that it doesn’t allow for changing the document ID, but
> only lets you pass in a record, which means that the document ID is
> auto-generated. See this line for what specifically is happening:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/
> elasticsearch/src/main/java/org/apache/beam/sdk/io/
> elasticsearch/ElasticsearchIO.java#L838
>
> Essentially the data part of the document is being placed but it doesn’t
> allow for other properties, such as the document ID, to be set.
>
> This leads to two problems:
>
> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given
> item in a PCollection, as I understand it. This means that you may get more
> than one record in Elastic for a given item in a PCollection that you pass
> in.
>
> 2. You can’t do partial updates to an index. If you run a batch job once,
> and then run the batch job again on the same index without clearing it, you
> just double everything in there.
>
> Is there any good way around this?
>
> I’d be 

Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Chet Aldrich
Given that this seems like a change that should probably happen, and I’d like 
to help contribute if possible, a few questions and my current opinion: 

So I’m leaning towards approach B here, which is:
> b. (a bit less user friendly) PCollection with K as an id. But forces the 
> user to do a Pardo before writing to ES to output KV pairs of 
> 
I think that the reduction in user-friendliness may be outweighed by the fact 
that this obviates some of the issues surrounding a failure when finishing a 
bundle. Additionally, this forces the user to provide a document id, which I 
think is probably better practice. This will also probably lead to fewer 
frustrations around “magic” code that just pulls something in if it happens to 
be there, and doesn’t if not. We’ll need to rely on the user catching this 
functionality in the docs or the code itself to take advantage of it. 

IMHO it’d be generally better to enforce this at compile time because it does 
have an effect on whether the pipeline produces duplicates on failure. 
Additionally, we get the benefit of relatively intuitive behavior where if the 
user passes in the same Key value, it’ll update a record in ES, and if the key 
is different then it will create a new record. 

Curious to hear thoughts on this. If this seems reasonable I’ll go ahead and 
create a JIRA for tracking and start working on a PR for this. Also, if it’d be 
good to loop in the dev mailing list before starting let me know, I’m pretty 
new to this. 

Chet 

> On Nov 15, 2017, at 12:53 AM, Etienne Chauchot  > wrote:
> 
> Hi Chet,
> 
> What you say is totally true, docs written using ElasticSearchIO will always 
> have an ES generated id. But it might change in the future, indeed it might 
> be a good thing to allow the user to pass an id. Just in 5 seconds thinking, 
> I see 3 possible designs for that. 
> a.(simplest) use a json special field for the id, if it is provided by the 
> user in the input json then it is used, auto-generated id otherwise.
> 
> b. (a bit less user friendly) PCollection with K as an id. But forces the 
> user to do a Pardo before writing to ES to output KV pairs of 
> 
> c. (a lot more complex) Allow the IO to serialize/deserialize java beans and 
> have an String id field. Matching java types to ES types is quite tricky, so, 
> for now we just relied on the user to serialize his beans into json and let 
> ES match the types automatically.
> 
> Related to the problems you raise bellow:
> 
> 1. Well, the bundle is the commit entity of beam. Consider the case of 
> ESIO.batchSize being < to bundle size. While processing records, when the 
> number of elements reaches batchSize, an ES bulk insert will be issued but no 
> finishBundle. If there is a problem later on in the bundle processing before 
> the finishBundle, the checkpoint will still be at the beginning of the 
> bundle, so all the bundle will be retried leading to duplicate documents. 
> Thanks for raising that! I'm CCing the dev list so that someone could correct 
> me on the checkpointing mecanism if I'm missing something. Besides I'm 
> thinking about forcing the user to provide an id in all cases to workaround 
> this issue.
> 2. Correct.
> 
> Best,
> Etienne
> 
> Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
>> Hello all! 
>> 
>> So I’ve been using the ElasticSearchIO sink for a project (unfortunately 
>> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC) 
>> and I’m finding that it doesn’t allow for changing the document ID, but only 
>> lets you pass in a record, which means that the document ID is 
>> auto-generated. See this line for what specifically is happening: 
>> 
>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838
>>  
>> 
>>   
>> 
>> Essentially the data part of the document is being placed but it doesn’t 
>> allow for other properties, such as the document ID, to be set. 
>> 
>> This leads to two problems: 
>> 
>> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given 
>> item in a PCollection, as I understand it. This means that you may get more 
>> than one record in Elastic for a given item in a PCollection that you pass 
>> in. 
>> 
>> 2. You can’t do partial updates to an index. If you run a batch job once, 
>> and then run the batch job again on the same index without clearing it, you 
>> just double everything in there. 
>> 
>> Is there any good way around this? 
>> 
>> I’d be happy to try writing up a PR for this in theory, but not sure how to 
>> best approach it. Also would like to figure out a way to get around this in 
>> the meantime, if anyone has any ideas. 
>> 
>> Best, 
>> 
>> Chet
>> 
>> P.S. CCed 

Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Etienne Chauchot
Yes, exactly. Actually, it raised from a discussion we had with Romain 
about ESIO.



Le 15/11/2017 à 10:08, Jean-Baptiste Onofré a écrit :
I think it's also related to the discussion Romain raised on the dev 
mailing list (gap between batch size, checkpointing & bundles).


Regards
JB

On 11/15/2017 09:53 AM, Etienne Chauchot wrote:

Hi Chet,

What you say is totally true, docs written using ElasticSearchIO will 
always have an ES generated id. But it might change in the future, 
indeed it might be a good thing to allow the user to pass an id. Just 
in 5 seconds thinking, I see 3 possible designs for that.


a.(simplest) use a json special field for the id, if it is provided 
by the user in the input json then it is used, auto-generated id 
otherwise.


b. (a bit less user friendly) PCollection with K as an id. But 
forces the user to do a Pardo before writing to ES to output KV pairs 
of 


c. (a lot more complex) Allow the IO to serialize/deserialize java 
beans and have an String id field. Matching java types to ES types is 
quite tricky, so, for now we just relied on the user to serialize his 
beans into json and let ES match the types automatically.


Related to the problems you raise bellow:

1. Well, the bundle is the commit entity of beam. Consider the case 
of ESIO.batchSize being < to bundle size. While processing records, 
when the number of elements reaches batchSize, an ES bulk insert will 
be issued but no finishBundle. If there is a problem later on in the 
bundle processing before the finishBundle, the checkpoint will still 
be at the beginning of the bundle, so all the bundle will be retried 
leading to duplicate documents. Thanks for raising that! I'm CCing 
the dev list so that someone could correct me on the checkpointing 
mecanism if I'm missing something. Besides I'm thinking about forcing 
the user to provide an id in all cases to workaround this issue.


2. Correct.

Best,
Etienne

Le 15/11/2017 à 02:16, Chet Aldrich a écrit :

Hello all!

So I’ve been using the ElasticSearchIO sink for a project 
(unfortunately it’s Elasticsearch 5.x, and so I’ve been messing 
around with the latest RC) and I’m finding that it doesn’t allow for 
changing the document ID, but only lets you pass in a record, which 
means that the document ID is auto-generated. See this line for what 
specifically is happening:


https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838 



Essentially the data part of the document is being placed but it 
doesn’t allow for other properties, such as the document ID, to be set.


This leads to two problems:

1. Beam doesn’t necessarily guarantee exactly-once execution for a 
given item in a PCollection, as I understand it. This means that you 
may get more than one record in Elastic for a given item in a 
PCollection that you pass in.


2. You can’t do partial updates to an index. If you run a batch job 
once, and then run the batch job again on the same index without 
clearing it, you just double everything in there.


Is there any good way around this?

I’d be happy to try writing up a PR for this in theory, but not sure 
how to best approach it. Also would like to figure out a way to get 
around this in the meantime, if anyone has any ideas.


Best,

Chet

P.S. CCed echauc...@gmail.com  because 
it seems like he’s been doing work related to the elastic sink.











Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Tim Robertson
Hi Chet,

+1 for interest in this from me too.

If it helps, I'd have expected a) to be the implementation (e.g. something
like "_id" being used if present) and handing multiple delivery being a
responsibility of the developer.

Thanks,
Tim




On Wed, Nov 15, 2017 at 10:08 AM, Jean-Baptiste Onofré 
wrote:

> I think it's also related to the discussion Romain raised on the dev
> mailing list (gap between batch size, checkpointing & bundles).
>
> Regards
> JB
>
> On 11/15/2017 09:53 AM, Etienne Chauchot wrote:
>
>> Hi Chet,
>>
>> What you say is totally true, docs written using ElasticSearchIO will
>> always have an ES generated id. But it might change in the future, indeed
>> it might be a good thing to allow the user to pass an id. Just in 5 seconds
>> thinking, I see 3 possible designs for that.
>>
>> a.(simplest) use a json special field for the id, if it is provided by
>> the user in the input json then it is used, auto-generated id otherwise.
>>
>> b. (a bit less user friendly) PCollection with K as an id. But forces
>> the user to do a Pardo before writing to ES to output KV pairs of 
>>
>> c. (a lot more complex) Allow the IO to serialize/deserialize java beans
>> and have an String id field. Matching java types to ES types is quite
>> tricky, so, for now we just relied on the user to serialize his beans into
>> json and let ES match the types automatically.
>>
>> Related to the problems you raise bellow:
>>
>> 1. Well, the bundle is the commit entity of beam. Consider the case of
>> ESIO.batchSize being < to bundle size. While processing records, when the
>> number of elements reaches batchSize, an ES bulk insert will be issued but
>> no finishBundle. If there is a problem later on in the bundle processing
>> before the finishBundle, the checkpoint will still be at the beginning of
>> the bundle, so all the bundle will be retried leading to duplicate
>> documents. Thanks for raising that! I'm CCing the dev list so that someone
>> could correct me on the checkpointing mecanism if I'm missing something.
>> Besides I'm thinking about forcing the user to provide an id in all cases
>> to workaround this issue.
>>
>> 2. Correct.
>>
>> Best,
>> Etienne
>>
>> Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
>>
>>> Hello all!
>>>
>>> So I’ve been using the ElasticSearchIO sink for a project (unfortunately
>>> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC)
>>> and I’m finding that it doesn’t allow for changing the document ID, but
>>> only lets you pass in a record, which means that the document ID is
>>> auto-generated. See this line for what specifically is happening:
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/elas
>>> ticsearch/src/main/java/org/apache/beam/sdk/io/elasticsear
>>> ch/ElasticsearchIO.java#L838
>>>
>>> Essentially the data part of the document is being placed but it doesn’t
>>> allow for other properties, such as the document ID, to be set.
>>>
>>> This leads to two problems:
>>>
>>> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given
>>> item in a PCollection, as I understand it. This means that you may get more
>>> than one record in Elastic for a given item in a PCollection that you pass
>>> in.
>>>
>>> 2. You can’t do partial updates to an index. If you run a batch job
>>> once, and then run the batch job again on the same index without clearing
>>> it, you just double everything in there.
>>>
>>> Is there any good way around this?
>>>
>>> I’d be happy to try writing up a PR for this in theory, but not sure how
>>> to best approach it. Also would like to figure out a way to get around this
>>> in the meantime, if anyone has any ideas.
>>>
>>> Best,
>>>
>>> Chet
>>>
>>> P.S. CCed echauc...@gmail.com  because it
>>> seems like he’s been doing work related to the elastic sink.
>>>
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Jean-Baptiste Onofré
I think it's also related to the discussion Romain raised on the dev mailing 
list (gap between batch size, checkpointing & bundles).


Regards
JB

On 11/15/2017 09:53 AM, Etienne Chauchot wrote:

Hi Chet,

What you say is totally true, docs written using ElasticSearchIO will always 
have an ES generated id. But it might change in the future, indeed it might be a 
good thing to allow the user to pass an id. Just in 5 seconds thinking, I see 3 
possible designs for that.


a.(simplest) use a json special field for the id, if it is provided by the user 
in the input json then it is used, auto-generated id otherwise.


b. (a bit less user friendly) PCollection with K as an id. But forces the 
user to do a Pardo before writing to ES to output KV pairs of 


c. (a lot more complex) Allow the IO to serialize/deserialize java beans and 
have an String id field. Matching java types to ES types is quite tricky, so, 
for now we just relied on the user to serialize his beans into json and let ES 
match the types automatically.


Related to the problems you raise bellow:

1. Well, the bundle is the commit entity of beam. Consider the case of 
ESIO.batchSize being < to bundle size. While processing records, when the number 
of elements reaches batchSize, an ES bulk insert will be issued but no 
finishBundle. If there is a problem later on in the bundle processing before the 
finishBundle, the checkpoint will still be at the beginning of the bundle, so 
all the bundle will be retried leading to duplicate documents. Thanks for 
raising that! I'm CCing the dev list so that someone could correct me on the 
checkpointing mecanism if I'm missing something. Besides I'm thinking about 
forcing the user to provide an id in all cases to workaround this issue.


2. Correct.

Best,
Etienne

Le 15/11/2017 à 02:16, Chet Aldrich a écrit :

Hello all!

So I’ve been using the ElasticSearchIO sink for a project (unfortunately it’s 
Elasticsearch 5.x, and so I’ve been messing around with the latest RC) and I’m 
finding that it doesn’t allow for changing the document ID, but only lets you 
pass in a record, which means that the document ID is auto-generated. See this 
line for what specifically is happening:


https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838

Essentially the data part of the document is being placed but it doesn’t allow 
for other properties, such as the document ID, to be set.


This leads to two problems:

1. Beam doesn’t necessarily guarantee exactly-once execution for a given item 
in a PCollection, as I understand it. This means that you may get more than 
one record in Elastic for a given item in a PCollection that you pass in.


2. You can’t do partial updates to an index. If you run a batch job once, and 
then run the batch job again on the same index without clearing it, you just 
double everything in there.


Is there any good way around this?

I’d be happy to try writing up a PR for this in theory, but not sure how to 
best approach it. Also would like to figure out a way to get around this in 
the meantime, if anyone has any ideas.


Best,

Chet

P.S. CCed echauc...@gmail.com  because it seems 
like he’s been doing work related to the elastic sink.







--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: [DISCUSS] Drop Spark 1.x support to focus on Spark 2.x

2017-11-15 Thread Jean-Baptiste Onofré

Any additional feedback about that ?

I will update the thread with the two branches later today: the one with Spark 
1.x & 2.x support, the one with Spark 2.x upgrade.


Thanks
Regards
JB

On 11/13/2017 09:32 AM, Jean-Baptiste Onofré wrote:

Hi Beamers,

I'm forwarding this discussion & vote from the dev mailing list to the user 
mailing list.

The goal is to have your feedback as user.

Basically, we have two options:
1. Right now, in the PR, we support both Spark 1.x and 2.x using three artifacts 
(common, spark1, spark2). You, as users, pick up spark1 or spark2 in your 
dependencies set depending the Spark target version you want.
2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0. If you 
still want to use Spark 1.x, then, you will be stuck up to Beam 2.2.0.


Thoughts ?

Thanks !
Regards
JB


 Forwarded Message 
Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x
Date: Wed, 8 Nov 2017 08:27:58 +0100
From: Jean-Baptiste Onofré 
Reply-To: d...@beam.apache.org
To: d...@beam.apache.org

Hi all,

as you might know, we are working on Spark 2.x support in the Spark runner.

I'm working on a PR about that:

https://github.com/apache/beam/pull/3808

Today, we have something working with both Spark 1.x and 2.x from a code 
standpoint, but I have to deal with dependencies. It's the first step of the 
update as I'm still using RDD, the second step would be to support dataframe 
(but for that, I would need PCollection elements with schemas, that's another 
topic on which Eugene, Reuven and I are discussing).


However, as all major distributions now ship Spark 2.x, I don't think it's 
required anymore to support Spark 1.x.


If we agree, I will update and cleanup the PR to only support and focus on Spark 
2.x.


So, that's why I'm calling for a vote:

   [ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only
   [ ] 0 (I don't care ;))
   [ ] -1, I would like to still support Spark 1.x, and so having support of 
both Spark 1.x and 2.x (please provide specific comment)


This vote is open for 48 hours (I have the commits ready, just waiting the end 
of the vote to push on the PR).


Thanks !
Regards
JB


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com