Re: Callbacks/other functions run after a PDone/output transform

2017-12-01 Thread Chet Aldrich
So I agree generally with the idea that returning a PCollection makes all of 
this easier so that arbitrary additional functions can be added, what exactly 
would write functions be returning in a PCollection that would make sense? The 
whole idea is that we’ve written to an external source and now the collection 
itself is no longer needed. 

Currently, that’s represented with a PDone, but currently that doesn’t allow 
any work to occur after it. I see a couple possible ways of handling this given 
this conversation, and am curious which solution sounds like the best way to 
deal with the problem:

1. Have output transforms always return something specific (which would be the 
same across transforms by convention), that is in the form of a PCollection, so 
operations can occur after it. 

2. Make either PDone or some new type that can act as a PCollection so we can 
run applies afterward. 

3. Make output transforms provide the facility for a callback function which 
runs after the transform is complete.

I went through these gymnastics recently when I was trying to build something 
that would move indices after writing to Algolia, and the solution was to 
co-opt code from the old Sink class that used to exist in Beam. The problem is 
that particular method requires the output transform in question
to return a PCollection, even if it is trivial or doesn’t make sense to return 
one. This seems like a bad solution, but unfortunately there isn’t a notion of 
a transform that has no explicit output that needs to have operations occur 
after it. 

The three potential solutions above address this issue, but I would like to 
hear on which would be preferable (or perhaps a different proposal 
altogether?). Perhaps we could also start up a ticket on this, since it seems 
like a worthwhile feature addition. I would find it really useful, for one. 

Chet

> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik  wrote:
> 
> Instead of a callback fn, its most useful if a PCollection is returned 
> containing the result of the sink so that any arbitrary additional functions 
> can be applied.
> 
> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré  > wrote:
> Agree, I would prefer to do the callback in the IO more than in the main.
> 
> Regards
> JB
> 
> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
> I do something almost exactly like this, but with BigtableIO instead.  I have 
> a pull request open here [1] (which reminds me I need to finish this up...).  
> It would really be nice for most IOs to support something like this.
> 
> Essentially you do a GroupByKey (or some CombineFn) on the output from the 
> BigtableIO, and then feed that into your function which will run when all 
> writes finish.
> 
> You probably want to avoid doing something in the main method because there's 
> no guarantee it'll actually run (maybe the driver will die, get killed, 
> machine will explode, etc).
> 
> [1] https://github.com/apache/beam/pull/3997 
> 
> 
> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick    >> wrote:
> 
> Assuming you're in Java. You could just follow on in your Main method.
> Checking the state of the Result.
> 
> Example:
> PipelineResult result = pipeline.run();
> try {
> result.waitUntilFinish();
> if(result.getState() == PipelineResult.State.DONE) {
> //DO ES work
> }
> } catch(Exception e) {
> result.cancel();
> throw e;
> }
> 
> Otherwise you could also use Oozie to construct a work flow.
> 
> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré  
> >> wrote:
> 
> Hi,
> 
> yes, we had a similar question some days ago.
> 
> We can imagine to have a user callback fn fired when the sink batch is
> complete.
> 
> Let me think about that.
> 
> Regards
> JB
> 
> On 12/01/2017 09:04 AM, Philip Chan wrote:
> 
> Hey JB,
> 
> Thanks for getting back so quickly.
> I suppose in that case I would need a way of monitoring when the 
> ES
> transform completes successfully before I can proceed with doing 
> the
> swap.
> The problem with this is that I can't think of a good way to
> determine that termination state short of polling the new index to
> check the document count compared to the size of input 
> PCollection.
> That, or maybe I'd need to use an external system like you 
> mentioned
> to poll on the state of the pipeline (I'm using Google Dataflow, 
> so
> maybe there's a way to do this with some API).
> But I would have thought that there would be an easy way of simply
> saying "do not process this transform until this other t

Re: Using JDBC IO read transform, running out of memory on DataflowRunner.

2017-11-30 Thread Chet Aldrich
Hey Eugene, 

Thanks for this, didn’t realize this was a parameter I could tune. Fixed my 
problems straight away. 

Chet

> On Nov 29, 2017, at 2:14 PM, Eugene Kirpichov  wrote:
> 
> Hi,
> I think you're hitting something that can be fixed by configuring Redshift 
> driver:
> http://docs.aws.amazon.com/redshift/latest/dg/queries-troubleshooting.html#set-the-JDBC-fetch-size-parameter
>  
> <http://docs.aws.amazon.com/redshift/latest/dg/queries-troubleshooting.html#set-the-JDBC-fetch-size-parameter>
> By default, the JDBC driver collects all the results for a query at one time. 
> As a result, when you attempt to retrieve a large result set over a JDBC 
> connection, you might encounter a client-side out-of-memory error. To enable 
> your client to retrieve result sets in batches instead of in a single 
> all-or-nothing fetch, set the JDBC fetch size parameter in your client 
> application.
> 
> On Wed, Nov 29, 2017 at 1:41 PM Chet Aldrich  <mailto:chet.aldr...@postmates.com>> wrote:
> Hey all, 
> 
> I’m running a Dataflow job that uses the JDBC IO transform to pull in a bunch 
> of data (20mm rows, for reference) from Redshift, and I’m noticing that I’m 
> getting an OutofMemoryError on the Dataflow workers once I reach around 4mm 
> rows. 
> 
> It seems like given the code that I’m reading inside JDBC IO and the guide 
> here 
> (https://beam.apache.org/documentation/io/authoring-overview/#read-transforms 
> <https://beam.apache.org/documentation/io/authoring-overview/#read-transforms>)
>  that it’s just pulling the data in from the result one-by-one and the 
> emitting each output. Considering that this is sort of a limitation of the 
> driver, this makes sense, but is there a way I can get around the memory 
> limitation somehow? It seems like Dataflow repeatedly tries to create more 
> workers to handle the work, but it can’t, which is part of the problem. 
> 
> If more info is needed in order to help me sort out what I could do to not 
> run into the memory limitations I’m happy to provide it. 
> 
> 
> Thanks,
> 
> Chet 



Using JDBC IO read transform, running out of memory on DataflowRunner.

2017-11-29 Thread Chet Aldrich
Hey all, 

I’m running a Dataflow job that uses the JDBC IO transform to pull in a bunch 
of data (20mm rows, for reference) from Redshift, and I’m noticing that I’m 
getting an OutofMemoryError on the Dataflow workers once I reach around 4mm 
rows. 

It seems like given the code that I’m reading inside JDBC IO and the guide here 
(https://beam.apache.org/documentation/io/authoring-overview/#read-transforms 
) 
that it’s just pulling the data in from the result one-by-one and the emitting 
each output. Considering that this is sort of a limitation of the driver, this 
makes sense, but is there a way I can get around the memory limitation somehow? 
It seems like Dataflow repeatedly tries to create more workers to handle the 
work, but it can’t, which is part of the problem. 

If more info is needed in order to help me sort out what I could do to not run 
into the memory limitations I’m happy to provide it. 


Thanks,

Chet 

Re: Slack Channel

2017-11-16 Thread Chet Aldrich
If you wouldn’t mind I’d like an invite as well. 

Chet

> On Nov 16, 2017, at 4:58 PM, Jacob Marble  wrote:
> 
> Me too, if you don't mind.
> 
> Jacob
> 
> On Thu, Nov 9, 2017 at 2:09 PM, Lukasz Cwik  > wrote:
> Invite sent, welcome.
> 
> On Thu, Nov 9, 2017 at 2:08 PM, Fred Tsang  > wrote:
> Hi, 
> 
> Please add me to the slack channel. 
> 
> Thanks,
> Fred
> 
> Ps. I think "BeamTV" would be a great YouTube channel ;)
> 
> 



Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-16 Thread Chet Aldrich
Sure, I’d be happy to take it on. My JIRA ID is chetaldrich. We can continue 
discussion on that ticket. 

Chet

> On Nov 16, 2017, at 7:57 AM, Etienne Chauchot  wrote:
> 
> Chet, 
> FYI, here is the ticket and the design proposal: 
> https://issues.apache.org/jira/browse/BEAM-3201 
> <https://issues.apache.org/jira/browse/BEAM-3201>. If you still want to code 
> that improvement, give me your jira id and I will assign the ticket to you. 
> Otherwise I can code it as well.
> 
> Best
> 
> Etienne
> 
> Le 16/11/2017 à 09:19, Etienne Chauchot a écrit :
>> Hi, 
>> Thanks for the offer, I'd be happy to review your PR. Just wait a bit until 
>> I have opened a proper ticket for that. I still need to think more about the 
>> design. Among other things, I have to check what ES dev team did for other 
>> big data ES IO (es_hadoop) on that particular point. Besides, I think we 
>> also need to deal with the id at read time not only at write time. I'll give 
>> some details in the ticket.
>> 
>> Le 15/11/2017 à 20:08, Chet Aldrich a écrit :
>>> Given that this seems like a change that should probably happen, and I’d 
>>> like to help contribute if possible, a few questions and my current 
>>> opinion: 
>>> 
>>> So I’m leaning towards approach B here, which is:
>>>> b. (a bit less user friendly) PCollection with K as an id. But forces 
>>>> the user to do a Pardo before writing to ES to output KV pairs of >>> json>
>>>> 
>>> I think that the reduction in user-friendliness may be outweighed by the 
>>> fact that this obviates some of the issues surrounding a failure when 
>>> finishing a bundle. Additionally, this forces the user to provide a 
>>> document id, which I think is probably better practice.
>>> 
>> Yes as I wrote before, I think it is better to force the user to provide an 
>> id (at least for index updates, exactly-one semantics is a larger beam 
>> subject than this IO scope). Regarding design, plan b is not the better one 
>> IMHO because it changes the IO public API. I'm more in favor of plan a with 
>> the ability for the user to tell what field is his doc id.
>>> This will also probably lead to fewer frustrations around “magic” code that 
>>> just pulls something in if it happens to be there, and doesn’t if not. 
>>> We’ll need to rely on the user catching this functionality in the docs or 
>>> the code itself to take advantage of it. 
>>> 
>>> IMHO it’d be generally better to enforce this at compile time because it 
>>> does have an effect on whether the pipeline produces duplicates on failure. 
>>> Additionally, we get the benefit of relatively intuitive behavior where if 
>>> the user passes in the same Key value, it’ll update a record in ES, and if 
>>> the key is different then it will create a new record. 
>> Totally agree, id enforcement at compile time, no auto-generation
>>> 
>>> Curious to hear thoughts on this. If this seems reasonable I’ll go ahead 
>>> and create a JIRA for tracking and start working on a PR for this. Also, if 
>>> it’d be good to loop in the dev mailing list before starting let me know, 
>>> I’m pretty new to this. 
>> I'll create the ticket and we will loop on design in the comments.
>> Best
>> Etienne
>>> 
>>> Chet 
>>> 
>>>> On Nov 15, 2017, at 12:53 AM, Etienne Chauchot >>> <mailto:echauc...@apache.org>> wrote:
>>>> 
>>>> Hi Chet,
>>>> 
>>>> What you say is totally true, docs written using ElasticSearchIO will 
>>>> always have an ES generated id. But it might change in the future, indeed 
>>>> it might be a good thing to allow the user to pass an id. Just in 5 
>>>> seconds thinking, I see 3 possible designs for that. 
>>>> a.(simplest) use a json special field for the id, if it is provided by the 
>>>> user in the input json then it is used, auto-generated id otherwise.
>>>> 
>>>> b. (a bit less user friendly) PCollection with K as an id. But forces 
>>>> the user to do a Pardo before writing to ES to output KV pairs of >>> json>
>>>> 
>>>> c. (a lot more complex) Allow the IO to serialize/deserialize java beans 
>>>> and have an String id field. Matching java types to ES types is quite 
>>>> tricky, so, for now we just relied on the user to serialize his beans into 
>>>> json and let ES match the types automatically.
>>>> 
>>>> 

Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Chet Aldrich
Given that this seems like a change that should probably happen, and I’d like 
to help contribute if possible, a few questions and my current opinion: 

So I’m leaning towards approach B here, which is:
> b. (a bit less user friendly) PCollection with K as an id. But forces the 
> user to do a Pardo before writing to ES to output KV pairs of 
> 
I think that the reduction in user-friendliness may be outweighed by the fact 
that this obviates some of the issues surrounding a failure when finishing a 
bundle. Additionally, this forces the user to provide a document id, which I 
think is probably better practice. This will also probably lead to fewer 
frustrations around “magic” code that just pulls something in if it happens to 
be there, and doesn’t if not. We’ll need to rely on the user catching this 
functionality in the docs or the code itself to take advantage of it. 

IMHO it’d be generally better to enforce this at compile time because it does 
have an effect on whether the pipeline produces duplicates on failure. 
Additionally, we get the benefit of relatively intuitive behavior where if the 
user passes in the same Key value, it’ll update a record in ES, and if the key 
is different then it will create a new record. 

Curious to hear thoughts on this. If this seems reasonable I’ll go ahead and 
create a JIRA for tracking and start working on a PR for this. Also, if it’d be 
good to loop in the dev mailing list before starting let me know, I’m pretty 
new to this. 

Chet 

> On Nov 15, 2017, at 12:53 AM, Etienne Chauchot  <mailto:echauc...@apache.org>> wrote:
> 
> Hi Chet,
> 
> What you say is totally true, docs written using ElasticSearchIO will always 
> have an ES generated id. But it might change in the future, indeed it might 
> be a good thing to allow the user to pass an id. Just in 5 seconds thinking, 
> I see 3 possible designs for that. 
> a.(simplest) use a json special field for the id, if it is provided by the 
> user in the input json then it is used, auto-generated id otherwise.
> 
> b. (a bit less user friendly) PCollection with K as an id. But forces the 
> user to do a Pardo before writing to ES to output KV pairs of 
> 
> c. (a lot more complex) Allow the IO to serialize/deserialize java beans and 
> have an String id field. Matching java types to ES types is quite tricky, so, 
> for now we just relied on the user to serialize his beans into json and let 
> ES match the types automatically.
> 
> Related to the problems you raise bellow:
> 
> 1. Well, the bundle is the commit entity of beam. Consider the case of 
> ESIO.batchSize being < to bundle size. While processing records, when the 
> number of elements reaches batchSize, an ES bulk insert will be issued but no 
> finishBundle. If there is a problem later on in the bundle processing before 
> the finishBundle, the checkpoint will still be at the beginning of the 
> bundle, so all the bundle will be retried leading to duplicate documents. 
> Thanks for raising that! I'm CCing the dev list so that someone could correct 
> me on the checkpointing mecanism if I'm missing something. Besides I'm 
> thinking about forcing the user to provide an id in all cases to workaround 
> this issue.
> 2. Correct.
> 
> Best,
> Etienne
> 
> Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
>> Hello all! 
>> 
>> So I’ve been using the ElasticSearchIO sink for a project (unfortunately 
>> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC) 
>> and I’m finding that it doesn’t allow for changing the document ID, but only 
>> lets you pass in a record, which means that the document ID is 
>> auto-generated. See this line for what specifically is happening: 
>> 
>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838
>>  
>> <https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838>
>>   
>> 
>> Essentially the data part of the document is being placed but it doesn’t 
>> allow for other properties, such as the document ID, to be set. 
>> 
>> This leads to two problems: 
>> 
>> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given 
>> item in a PCollection, as I understand it. This means that you may get more 
>> than one record in Elastic for a given item in a PCollection that you pass 
>> in. 
>> 
>> 2. You can’t do partial updates to an index. If you run a batch job once, 
>> and then run the batch job again on the same index without clearing it, you 
>> just double everything in there. 
>> 
>> Is there any

Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-14 Thread Chet Aldrich
Hello all! 

So I’ve been using the ElasticSearchIO sink for a project (unfortunately it’s 
Elasticsearch 5.x, and so I’ve been messing around with the latest RC) and I’m 
finding that it doesn’t allow for changing the document ID, but only lets you 
pass in a record, which means that the document ID is auto-generated. See this 
line for what specifically is happening: 

https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838
 

 

Essentially the data part of the document is being placed but it doesn’t allow 
for other properties, such as the document ID, to be set. 

This leads to two problems: 

1. Beam doesn’t necessarily guarantee exactly-once execution for a given item 
in a PCollection, as I understand it. This means that you may get more than one 
record in Elastic for a given item in a PCollection that you pass in. 

2. You can’t do partial updates to an index. If you run a batch job once, and 
then run the batch job again on the same index without clearing it, you just 
double everything in there. 

Is there any good way around this? 

I’d be happy to try writing up a PR for this in theory, but not sure how to 
best approach it. Also would like to figure out a way to get around this in the 
meantime, if anyone has any ideas. 

Best, 

Chet

P.S. CCed echauc...@gmail.com  because it seems 
like he’s been doing work related to the elastic sink.