Re: ElasticSearch with RestHighLevelClient

2017-10-25 Thread Ryan Bobko
Hi Etienne,
That was just the hint I needed! Until Beam 2.2 is released, my
solution is to keep a handle to the low level client, and close that
in the teardown method. This seems to solve the threading issue.

ry

On Wed, Oct 25, 2017 at 6:09 AM, Etienne Chauchot  wrote:
> Hi,
>
> We have used the low level rest client for the ElasticsearchIO despite the
> fact that there is now the high level client for maintenance reasons.
> Indeed, the IO production code is common to all Elasticsearch versions
> because the low level Rest client is compatible with all ES versions. This
> is not the case with the high level client which major version has to be the
> same as the targeted ES major version which will lead to having different IO
> versions for different ES versions.
>
> That is why IMHO it is not a good idea to use the high level Rest client in
> the ElasticsearchIO.
>
> Regards,
>
> Etienne Chauchot
>
>
>
> Le 23/10/2017 à 23:21, Ryan Bobko a écrit :
>>
>> Thanks Tim,
>> I believe I'm doing what Jean-Baptiste recommends, so I guess I'll
>> have a look at the snapshot and see what's different. I don't mind
>> waiting a bit if it means I don't have to duplicate working code.
>>
>> ry
>>
>> On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson
>>  wrote:
>>>
>>> Hi Ryan,
>>>
>>> I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am
>>> told
>>> 2.2.0 is expected within a couple weeks.
>>> My work is only a proof of concept for now, but I put in 300M fairly
>>> small
>>> docs at around 100,000/sec on a 3 node cluster without any issue [1].
>>>
>>> Hope this helps,
>>> Tim
>>>
>>>
>>> [1]
>>>
>>> https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java
>>>
>>>
>>> On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré 
>>> wrote:

 Hi Ryan,

 the last version of ElasticsearchIO (that will be included in Beam
 2.2.0)
 supports Elasticsearch 5.x.

 The client should be created in the @Setup (or @StartBundle) and release
 cleanly in @Teardown (or @FinishBundle). Then, it's used in
 @ProcessElement
 to actually store the elements in the PCollection.

 Regards
 JB


 On 10/23/2017 08:53 PM, Ryan Bobko wrote:
>
> Hi JB,
> Thanks for your input. I'm trying to update ElasticsearchIO, and
> hopefully learn a bit about Beam in the process. The documentation
> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
> prefer not to have two ES libs in my classpath if I can avoid it. I'm
> just getting started, so my pipeline is quite simple:
>
> pipeline.apply( "Raw Reader", reader ) // read raw files
>   .apply( "Document Generator", ParDo.of( extractor ) ) //
> create my document objects for ES insertion
>   .apply( "Elastic Writer", new ElasticWriter( ... ); //
> upload to ES
>
>
> public final class ElasticWriter extends
> PTransform {
>
> private static final Logger log = LoggerFactory.getLogger(
> ElasticWriter.class );
> private final String elasticurl;
>
> public ElasticWriter( String url ) {
>   elasticurl = url;
> }
>
> @Override
> public PDone expand( PCollection input ) {
>   input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
>   return PDone.in( input.getPipeline() );
> }
>
> public static class WriteFn extends DoFn implements
> Serializable {
>
>   private transient RestHighLevelClient client;
>   private final String elasticurl;
>
>   public WriteFn( String elasticurl ) {
> this.elasticurl = elasticurl;
>   }
>
>   @Setup
>   public void setup() {
> log.debug( " into WriteFn::setup" );
> HttpHost elastic = HttpHost.create( elasticurl );
> RestClientBuilder bldr = RestClient.builder( elastic );
>
> // if this is uncommented, the program never exits
> //client = new RestHighLevelClient( bldr.build() );
>   }
>
>   @Teardown
>   public void teardown() {
> log.debug( " into WriteFn::teardown" );
> // there's nothing to tear down
>   }
>
>   @ProcessElement
>   public void pe( ProcessContext c ) {
> Document doc = DocumentImpl.from( c.element() );
> log.debug( "writing {} to elastic", doc.getMetadata().first(
> Metadata.NAME ) );
>
> // this is where I want to write to ES, but for now, just write
> a text file
>
> ObjectMapper mpr = new ObjectMapper();
>
> try ( Writer fos = new BufferedWriter( new FileWriter( new
> 

Re: Infinite retry in streaming - is there a workaround?

2017-10-25 Thread Aleksandr
Hello Derek,
There no general solution for failing bundle. Some kind of dataflow errors
you can fix using dataflow update feature. Another solution is to catch
exceptions in ParDo function.

25. okt 2017 9:42 PM kirjutas kuupäeval "Griselda Cuevas" :

Hi Derek, yes you can use that mailing list and also the SO channel.

Cheers,
G


> BTW, do you know if there's a Dataflow mailing list for questions like
> this? Would dataflow-feedback be the appropriate mailing list?
>
> Thanks,
>
> Derek
>
> On Wed, Oct 25, 2017 at 10:58 AM, Griselda Cuevas  wrote:
>
>> Hi Derek - It sounds like this is a Dataflow specific questions so I'd
>> recommend you also reach out through the Dataflow's Stack Overflow
>> 
>> channel. I'm also cc'ing Thomas Groh who might be able to help.
>>
>>
>>
>> On 20 October 2017 at 11:35, Derek Hao Hu  wrote:
>>
>>> ​Kindly ping as I'm really curious about this. :p
>>>
>>> Derek​
>>>
>>> On Thu, Oct 19, 2017 at 2:15 PM, Derek Hao Hu 
>>> wrote:
>>>
 Hi,

 ​We are trying to use Dataflow in Prod and right now one of our main
 concerns is this "infinite retry" behavior which might stall the whole
 pipeline.

 Right now for all the DoFns we've implemented ourselves we've added
 some error handling or exception swallowing mechanism to make sure some
 bundles can just fail and we log the exceptions. But we are a bit concerned
 about the other Beam native transforms which we can not easily wrap, e.g.
 PubSubIO transforms and DatastoreV1 transforms.

 A few days ago I asked a specific question in this group about how one
 can catch exception in DatastoreV1 transforms and the recommended approach
 is to 1) either duplicate the code in the current DatastoreV1
 implementation and swallow the exception instead of throwing or 2) Follow
 the implementation of BigQueryIO to add the ability to support custom retry
 policy. Both are feasible options but I'm a bit concerned in that doesn't
 that mean eventually all Beam native transforms need to implement something
 like 2) if we want to use them in Prod?

 So in short, I want to know right now what is the recommended approach
 or workaround to say, hey, just let this bundle fail and we can process the
 rest of the elements instead of just stall the pipeline?

 Thanks!
 --
 Derek Hao Hu

 Software Engineer | Snapchat
 Snap Inc.

>>>
>>>
>>>
>>> --
>>> Derek Hao Hu
>>>
>>> Software Engineer | Snapchat
>>> Snap Inc.
>>>
>>
>>
>
>
> --
> Derek Hao Hu
>
> Software Engineer | Snapchat
> Snap Inc.
>


Re: Infinite retry in streaming - is there a workaround?

2017-10-25 Thread Griselda Cuevas
Hi Derek, yes you can use that mailing list and also the SO channel.

Cheers,
G


> BTW, do you know if there's a Dataflow mailing list for questions like
> this? Would dataflow-feedback be the appropriate mailing list?
>
> Thanks,
>
> Derek
>
> On Wed, Oct 25, 2017 at 10:58 AM, Griselda Cuevas  wrote:
>
>> Hi Derek - It sounds like this is a Dataflow specific questions so I'd
>> recommend you also reach out through the Dataflow's Stack Overflow
>> 
>> channel. I'm also cc'ing Thomas Groh who might be able to help.
>>
>>
>>
>> On 20 October 2017 at 11:35, Derek Hao Hu  wrote:
>>
>>> ​Kindly ping as I'm really curious about this. :p
>>>
>>> Derek​
>>>
>>> On Thu, Oct 19, 2017 at 2:15 PM, Derek Hao Hu 
>>> wrote:
>>>
 Hi,

 ​We are trying to use Dataflow in Prod and right now one of our main
 concerns is this "infinite retry" behavior which might stall the whole
 pipeline.

 Right now for all the DoFns we've implemented ourselves we've added
 some error handling or exception swallowing mechanism to make sure some
 bundles can just fail and we log the exceptions. But we are a bit concerned
 about the other Beam native transforms which we can not easily wrap, e.g.
 PubSubIO transforms and DatastoreV1 transforms.

 A few days ago I asked a specific question in this group about how one
 can catch exception in DatastoreV1 transforms and the recommended approach
 is to 1) either duplicate the code in the current DatastoreV1
 implementation and swallow the exception instead of throwing or 2) Follow
 the implementation of BigQueryIO to add the ability to support custom retry
 policy. Both are feasible options but I'm a bit concerned in that doesn't
 that mean eventually all Beam native transforms need to implement something
 like 2) if we want to use them in Prod?

 So in short, I want to know right now what is the recommended approach
 or workaround to say, hey, just let this bundle fail and we can process the
 rest of the elements instead of just stall the pipeline?

 Thanks!
 --
 Derek Hao Hu

 Software Engineer | Snapchat
 Snap Inc.

>>>
>>>
>>>
>>> --
>>> Derek Hao Hu
>>>
>>> Software Engineer | Snapchat
>>> Snap Inc.
>>>
>>
>>
>
>
> --
> Derek Hao Hu
>
> Software Engineer | Snapchat
> Snap Inc.
>


Re: Infinite retry in streaming - is there a workaround?

2017-10-25 Thread Derek Hao Hu
Aha, I just realized this is just a Dataflow behavior not a Beam default
behavior. :) Thanks Griselda. I'll post in the SO channel.

BTW, do you know if there's a Dataflow mailing list for questions like
this? Would dataflow-feedback be the appropriate mailing list?

Thanks,

Derek

On Wed, Oct 25, 2017 at 10:58 AM, Griselda Cuevas  wrote:

> Hi Derek - It sounds like this is a Dataflow specific questions so I'd
> recommend you also reach out through the Dataflow's Stack Overflow
> 
> channel. I'm also cc'ing Thomas Groh who might be able to help.
>
>
>
> On 20 October 2017 at 11:35, Derek Hao Hu  wrote:
>
>> ​Kindly ping as I'm really curious about this. :p
>>
>> Derek​
>>
>> On Thu, Oct 19, 2017 at 2:15 PM, Derek Hao Hu 
>> wrote:
>>
>>> Hi,
>>>
>>> ​We are trying to use Dataflow in Prod and right now one of our main
>>> concerns is this "infinite retry" behavior which might stall the whole
>>> pipeline.
>>>
>>> Right now for all the DoFns we've implemented ourselves we've added some
>>> error handling or exception swallowing mechanism to make sure some bundles
>>> can just fail and we log the exceptions. But we are a bit concerned about
>>> the other Beam native transforms which we can not easily wrap, e.g.
>>> PubSubIO transforms and DatastoreV1 transforms.
>>>
>>> A few days ago I asked a specific question in this group about how one
>>> can catch exception in DatastoreV1 transforms and the recommended approach
>>> is to 1) either duplicate the code in the current DatastoreV1
>>> implementation and swallow the exception instead of throwing or 2) Follow
>>> the implementation of BigQueryIO to add the ability to support custom retry
>>> policy. Both are feasible options but I'm a bit concerned in that doesn't
>>> that mean eventually all Beam native transforms need to implement something
>>> like 2) if we want to use them in Prod?
>>>
>>> So in short, I want to know right now what is the recommended approach
>>> or workaround to say, hey, just let this bundle fail and we can process the
>>> rest of the elements instead of just stall the pipeline?
>>>
>>> Thanks!
>>> --
>>> Derek Hao Hu
>>>
>>> Software Engineer | Snapchat
>>> Snap Inc.
>>>
>>
>>
>>
>> --
>> Derek Hao Hu
>>
>> Software Engineer | Snapchat
>> Snap Inc.
>>
>
>


-- 
Derek Hao Hu

Software Engineer | Snapchat
Snap Inc.


Re: Infinite retry in streaming - is there a workaround?

2017-10-25 Thread Griselda Cuevas
Hi Derek - It sounds like this is a Dataflow specific questions so I'd
recommend you also reach out through the Dataflow's Stack Overflow
 channel.
I'm also cc'ing Thomas Groh who might be able to help.



On 20 October 2017 at 11:35, Derek Hao Hu  wrote:

> ​Kindly ping as I'm really curious about this. :p
>
> Derek​
>
> On Thu, Oct 19, 2017 at 2:15 PM, Derek Hao Hu 
> wrote:
>
>> Hi,
>>
>> ​We are trying to use Dataflow in Prod and right now one of our main
>> concerns is this "infinite retry" behavior which might stall the whole
>> pipeline.
>>
>> Right now for all the DoFns we've implemented ourselves we've added some
>> error handling or exception swallowing mechanism to make sure some bundles
>> can just fail and we log the exceptions. But we are a bit concerned about
>> the other Beam native transforms which we can not easily wrap, e.g.
>> PubSubIO transforms and DatastoreV1 transforms.
>>
>> A few days ago I asked a specific question in this group about how one
>> can catch exception in DatastoreV1 transforms and the recommended approach
>> is to 1) either duplicate the code in the current DatastoreV1
>> implementation and swallow the exception instead of throwing or 2) Follow
>> the implementation of BigQueryIO to add the ability to support custom retry
>> policy. Both are feasible options but I'm a bit concerned in that doesn't
>> that mean eventually all Beam native transforms need to implement something
>> like 2) if we want to use them in Prod?
>>
>> So in short, I want to know right now what is the recommended approach or
>> workaround to say, hey, just let this bundle fail and we can process the
>> rest of the elements instead of just stall the pipeline?
>>
>> Thanks!
>> --
>> Derek Hao Hu
>>
>> Software Engineer | Snapchat
>> Snap Inc.
>>
>
>
>
> --
> Derek Hao Hu
>
> Software Engineer | Snapchat
> Snap Inc.
>


Re: ElasticSearch with RestHighLevelClient

2017-10-25 Thread Etienne Chauchot

Hi,

We have used the low level rest client for the ElasticsearchIO despite 
the fact that there is now the high level client for maintenance 
reasons. Indeed, the IO production code is common to all Elasticsearch 
versions because the low level Rest client is compatible with all ES 
versions. This is not the case with the high level client which major 
version has to be the same as the targeted ES major version which will 
lead to having different IO versions for different ES versions.


That is why IMHO it is not a good idea to use the high level Rest client 
in the ElasticsearchIO.


Regards,

Etienne Chauchot


Le 23/10/2017 à 23:21, Ryan Bobko a écrit :

Thanks Tim,
I believe I'm doing what Jean-Baptiste recommends, so I guess I'll
have a look at the snapshot and see what's different. I don't mind
waiting a bit if it means I don't have to duplicate working code.

ry

On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson
 wrote:

Hi Ryan,

I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am told
2.2.0 is expected within a couple weeks.
My work is only a proof of concept for now, but I put in 300M fairly small
docs at around 100,000/sec on a 3 node cluster without any issue [1].

Hope this helps,
Tim


[1]
https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java


On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré 
wrote:

Hi Ryan,

the last version of ElasticsearchIO (that will be included in Beam 2.2.0)
supports Elasticsearch 5.x.

The client should be created in the @Setup (or @StartBundle) and release
cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement
to actually store the elements in the PCollection.

Regards
JB


On 10/23/2017 08:53 PM, Ryan Bobko wrote:

Hi JB,
Thanks for your input. I'm trying to update ElasticsearchIO, and
hopefully learn a bit about Beam in the process. The documentation
says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
prefer not to have two ES libs in my classpath if I can avoid it. I'm
just getting started, so my pipeline is quite simple:

pipeline.apply( "Raw Reader", reader ) // read raw files
  .apply( "Document Generator", ParDo.of( extractor ) ) //
create my document objects for ES insertion
  .apply( "Elastic Writer", new ElasticWriter( ... ); //
upload to ES


public final class ElasticWriter extends
PTransform {

private static final Logger log = LoggerFactory.getLogger(
ElasticWriter.class );
private final String elasticurl;

public ElasticWriter( String url ) {
  elasticurl = url;
}

@Override
public PDone expand( PCollection input ) {
  input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
  return PDone.in( input.getPipeline() );
}

public static class WriteFn extends DoFn implements
Serializable {

  private transient RestHighLevelClient client;
  private final String elasticurl;

  public WriteFn( String elasticurl ) {
this.elasticurl = elasticurl;
  }

  @Setup
  public void setup() {
log.debug( " into WriteFn::setup" );
HttpHost elastic = HttpHost.create( elasticurl );
RestClientBuilder bldr = RestClient.builder( elastic );

// if this is uncommented, the program never exits
//client = new RestHighLevelClient( bldr.build() );
  }

  @Teardown
  public void teardown() {
log.debug( " into WriteFn::teardown" );
// there's nothing to tear down
  }

  @ProcessElement
  public void pe( ProcessContext c ) {
Document doc = DocumentImpl.from( c.element() );
log.debug( "writing {} to elastic", doc.getMetadata().first(
Metadata.NAME ) );

// this is where I want to write to ES, but for now, just write
a text file

ObjectMapper mpr = new ObjectMapper();

try ( Writer fos = new BufferedWriter( new FileWriter( new File(
"/tmp/writers",
doc.getMetadata().first( Metadata.NAME ).asString() ) ) )
) {
  mpr.writeValue( fos, doc );
}
catch ( IOException ioe ) {
  log.error( ioe.getLocalizedMessage(), ioe );
}
  }
}
}


On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré 
wrote:

Hi Ryan,

Why don't you use the ElasticsearchIO for that ?

Anyway, can you share your pipeline where you have the ParDo calling
your
DoFn ?

Thanks,
Regards
JB


On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:


Hi List,
I'm trying to write an updated ElasticSearch client using the
newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
interested in writes at this time, so I'm using the
ElasticsearchIO.write()
function as a model. I have a transient member named client. Here's my
setup
function for my DoFn:

@Setup
public void