Re: ElasticSearch with RestHighLevelClient
Hi Etienne, That was just the hint I needed! Until Beam 2.2 is released, my solution is to keep a handle to the low level client, and close that in the teardown method. This seems to solve the threading issue. ry On Wed, Oct 25, 2017 at 6:09 AM, Etienne Chauchotwrote: > Hi, > > We have used the low level rest client for the ElasticsearchIO despite the > fact that there is now the high level client for maintenance reasons. > Indeed, the IO production code is common to all Elasticsearch versions > because the low level Rest client is compatible with all ES versions. This > is not the case with the high level client which major version has to be the > same as the targeted ES major version which will lead to having different IO > versions for different ES versions. > > That is why IMHO it is not a good idea to use the high level Rest client in > the ElasticsearchIO. > > Regards, > > Etienne Chauchot > > > > Le 23/10/2017 à 23:21, Ryan Bobko a écrit : >> >> Thanks Tim, >> I believe I'm doing what Jean-Baptiste recommends, so I guess I'll >> have a look at the snapshot and see what's different. I don't mind >> waiting a bit if it means I don't have to duplicate working code. >> >> ry >> >> On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson >> wrote: >>> >>> Hi Ryan, >>> >>> I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am >>> told >>> 2.2.0 is expected within a couple weeks. >>> My work is only a proof of concept for now, but I put in 300M fairly >>> small >>> docs at around 100,000/sec on a 3 node cluster without any issue [1]. >>> >>> Hope this helps, >>> Tim >>> >>> >>> [1] >>> >>> https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java >>> >>> >>> On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré >>> wrote: Hi Ryan, the last version of ElasticsearchIO (that will be included in Beam 2.2.0) supports Elasticsearch 5.x. The client should be created in the @Setup (or @StartBundle) and release cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement to actually store the elements in the PCollection. Regards JB On 10/23/2017 08:53 PM, Ryan Bobko wrote: > > Hi JB, > Thanks for your input. I'm trying to update ElasticsearchIO, and > hopefully learn a bit about Beam in the process. The documentation > says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd > prefer not to have two ES libs in my classpath if I can avoid it. I'm > just getting started, so my pipeline is quite simple: > > pipeline.apply( "Raw Reader", reader ) // read raw files > .apply( "Document Generator", ParDo.of( extractor ) ) // > create my document objects for ES insertion > .apply( "Elastic Writer", new ElasticWriter( ... ); // > upload to ES > > > public final class ElasticWriter extends > PTransform { > > private static final Logger log = LoggerFactory.getLogger( > ElasticWriter.class ); > private final String elasticurl; > > public ElasticWriter( String url ) { > elasticurl = url; > } > > @Override > public PDone expand( PCollection input ) { > input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); > return PDone.in( input.getPipeline() ); > } > > public static class WriteFn extends DoFn implements > Serializable { > > private transient RestHighLevelClient client; > private final String elasticurl; > > public WriteFn( String elasticurl ) { > this.elasticurl = elasticurl; > } > > @Setup > public void setup() { > log.debug( " into WriteFn::setup" ); > HttpHost elastic = HttpHost.create( elasticurl ); > RestClientBuilder bldr = RestClient.builder( elastic ); > > // if this is uncommented, the program never exits > //client = new RestHighLevelClient( bldr.build() ); > } > > @Teardown > public void teardown() { > log.debug( " into WriteFn::teardown" ); > // there's nothing to tear down > } > > @ProcessElement > public void pe( ProcessContext c ) { > Document doc = DocumentImpl.from( c.element() ); > log.debug( "writing {} to elastic", doc.getMetadata().first( > Metadata.NAME ) ); > > // this is where I want to write to ES, but for now, just write > a text file > > ObjectMapper mpr = new ObjectMapper(); > > try ( Writer fos = new BufferedWriter( new FileWriter( new >
Re: ElasticSearch with RestHighLevelClient
Hi, We have used the low level rest client for the ElasticsearchIO despite the fact that there is now the high level client for maintenance reasons. Indeed, the IO production code is common to all Elasticsearch versions because the low level Rest client is compatible with all ES versions. This is not the case with the high level client which major version has to be the same as the targeted ES major version which will lead to having different IO versions for different ES versions. That is why IMHO it is not a good idea to use the high level Rest client in the ElasticsearchIO. Regards, Etienne Chauchot Le 23/10/2017 à 23:21, Ryan Bobko a écrit : Thanks Tim, I believe I'm doing what Jean-Baptiste recommends, so I guess I'll have a look at the snapshot and see what's different. I don't mind waiting a bit if it means I don't have to duplicate working code. ry On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertsonwrote: Hi Ryan, I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am told 2.2.0 is expected within a couple weeks. My work is only a proof of concept for now, but I put in 300M fairly small docs at around 100,000/sec on a 3 node cluster without any issue [1]. Hope this helps, Tim [1] https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré wrote: Hi Ryan, the last version of ElasticsearchIO (that will be included in Beam 2.2.0) supports Elasticsearch 5.x. The client should be created in the @Setup (or @StartBundle) and release cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement to actually store the elements in the PCollection. Regards JB On 10/23/2017 08:53 PM, Ryan Bobko wrote: Hi JB, Thanks for your input. I'm trying to update ElasticsearchIO, and hopefully learn a bit about Beam in the process. The documentation says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd prefer not to have two ES libs in my classpath if I can avoid it. I'm just getting started, so my pipeline is quite simple: pipeline.apply( "Raw Reader", reader ) // read raw files .apply( "Document Generator", ParDo.of( extractor ) ) // create my document objects for ES insertion .apply( "Elastic Writer", new ElasticWriter( ... ); // upload to ES public final class ElasticWriter extends PTransform { private static final Logger log = LoggerFactory.getLogger( ElasticWriter.class ); private final String elasticurl; public ElasticWriter( String url ) { elasticurl = url; } @Override public PDone expand( PCollection input ) { input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); return PDone.in( input.getPipeline() ); } public static class WriteFn extends DoFn implements Serializable { private transient RestHighLevelClient client; private final String elasticurl; public WriteFn( String elasticurl ) { this.elasticurl = elasticurl; } @Setup public void setup() { log.debug( " into WriteFn::setup" ); HttpHost elastic = HttpHost.create( elasticurl ); RestClientBuilder bldr = RestClient.builder( elastic ); // if this is uncommented, the program never exits //client = new RestHighLevelClient( bldr.build() ); } @Teardown public void teardown() { log.debug( " into WriteFn::teardown" ); // there's nothing to tear down } @ProcessElement public void pe( ProcessContext c ) { Document doc = DocumentImpl.from( c.element() ); log.debug( "writing {} to elastic", doc.getMetadata().first( Metadata.NAME ) ); // this is where I want to write to ES, but for now, just write a text file ObjectMapper mpr = new ObjectMapper(); try ( Writer fos = new BufferedWriter( new FileWriter( new File( "/tmp/writers", doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) ) { mpr.writeValue( fos, doc ); } catch ( IOException ioe ) { log.error( ioe.getLocalizedMessage(), ioe ); } } } } On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré wrote: Hi Ryan, Why don't you use the ElasticsearchIO for that ? Anyway, can you share your pipeline where you have the ParDo calling your DoFn ? Thanks, Regards JB On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: Hi List, I'm trying to write an updated ElasticSearch client using the newly-published RestHighLevelClient class (with ES 5.6.0). I'm only interested in writes at this time, so I'm using the ElasticsearchIO.write() function as a model. I have a transient member named client. Here's my setup function for my DoFn: @Setup public void
Re: ElasticSearch with RestHighLevelClient
Thanks Tim, I believe I'm doing what Jean-Baptiste recommends, so I guess I'll have a look at the snapshot and see what's different. I don't mind waiting a bit if it means I don't have to duplicate working code. ry On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertsonwrote: > Hi Ryan, > > I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am told > 2.2.0 is expected within a couple weeks. > My work is only a proof of concept for now, but I put in 300M fairly small > docs at around 100,000/sec on a 3 node cluster without any issue [1]. > > Hope this helps, > Tim > > > [1] > https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java > > > On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré > wrote: >> >> Hi Ryan, >> >> the last version of ElasticsearchIO (that will be included in Beam 2.2.0) >> supports Elasticsearch 5.x. >> >> The client should be created in the @Setup (or @StartBundle) and release >> cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement >> to actually store the elements in the PCollection. >> >> Regards >> JB >> >> >> On 10/23/2017 08:53 PM, Ryan Bobko wrote: >>> >>> Hi JB, >>> Thanks for your input. I'm trying to update ElasticsearchIO, and >>> hopefully learn a bit about Beam in the process. The documentation >>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd >>> prefer not to have two ES libs in my classpath if I can avoid it. I'm >>> just getting started, so my pipeline is quite simple: >>> >>> pipeline.apply( "Raw Reader", reader ) // read raw files >>> .apply( "Document Generator", ParDo.of( extractor ) ) // >>> create my document objects for ES insertion >>> .apply( "Elastic Writer", new ElasticWriter( ... ); // >>> upload to ES >>> >>> >>> public final class ElasticWriter extends >>> PTransform { >>> >>>private static final Logger log = LoggerFactory.getLogger( >>> ElasticWriter.class ); >>>private final String elasticurl; >>> >>>public ElasticWriter( String url ) { >>> elasticurl = url; >>>} >>> >>>@Override >>>public PDone expand( PCollection input ) { >>> input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); >>> return PDone.in( input.getPipeline() ); >>>} >>> >>>public static class WriteFn extends DoFn implements >>> Serializable { >>> >>> private transient RestHighLevelClient client; >>> private final String elasticurl; >>> >>> public WriteFn( String elasticurl ) { >>>this.elasticurl = elasticurl; >>> } >>> >>> @Setup >>> public void setup() { >>>log.debug( " into WriteFn::setup" ); >>>HttpHost elastic = HttpHost.create( elasticurl ); >>>RestClientBuilder bldr = RestClient.builder( elastic ); >>> >>>// if this is uncommented, the program never exits >>>//client = new RestHighLevelClient( bldr.build() ); >>> } >>> >>> @Teardown >>> public void teardown() { >>>log.debug( " into WriteFn::teardown" ); >>>// there's nothing to tear down >>> } >>> >>> @ProcessElement >>> public void pe( ProcessContext c ) { >>>Document doc = DocumentImpl.from( c.element() ); >>>log.debug( "writing {} to elastic", doc.getMetadata().first( >>> Metadata.NAME ) ); >>> >>>// this is where I want to write to ES, but for now, just write >>> a text file >>> >>>ObjectMapper mpr = new ObjectMapper(); >>> >>>try ( Writer fos = new BufferedWriter( new FileWriter( new File( >>> "/tmp/writers", >>>doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) >>> ) { >>> mpr.writeValue( fos, doc ); >>>} >>>catch ( IOException ioe ) { >>> log.error( ioe.getLocalizedMessage(), ioe ); >>>} >>> } >>>} >>> } >>> >>> >>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré >>> wrote: Hi Ryan, Why don't you use the ElasticsearchIO for that ? Anyway, can you share your pipeline where you have the ParDo calling your DoFn ? Thanks, Regards JB On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: > > > Hi List, > I'm trying to write an updated ElasticSearch client using the > newly-published RestHighLevelClient class (with ES 5.6.0). I'm only > interested in writes at this time, so I'm using the > ElasticsearchIO.write() > function as a model. I have a transient member named client. Here's my > setup > function for my DoFn: > > @Setup > public void setup() { > HttpHost elastic = HttpHost.create( elasticurl ); > RestClientBuilder bldr = RestClient.builder( elastic ); > client = new RestHighLevelClient(
Re: ElasticSearch with RestHighLevelClient
Hi Ryan, I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am told 2.2.0 is expected within a couple weeks. My work is only a proof of concept for now, but I put in 300M fairly small docs at around 100,000/sec on a 3 node cluster without any issue [1]. Hope this helps, Tim [1] https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofréwrote: > Hi Ryan, > > the last version of ElasticsearchIO (that will be included in Beam 2.2.0) > supports Elasticsearch 5.x. > > The client should be created in the @Setup (or @StartBundle) and release > cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement > to actually store the elements in the PCollection. > > Regards > JB > > > On 10/23/2017 08:53 PM, Ryan Bobko wrote: > >> Hi JB, >> Thanks for your input. I'm trying to update ElasticsearchIO, and >> hopefully learn a bit about Beam in the process. The documentation >> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd >> prefer not to have two ES libs in my classpath if I can avoid it. I'm >> just getting started, so my pipeline is quite simple: >> >> pipeline.apply( "Raw Reader", reader ) // read raw files >> .apply( "Document Generator", ParDo.of( extractor ) ) // >> create my document objects for ES insertion >> .apply( "Elastic Writer", new ElasticWriter( ... ); // >> upload to ES >> >> >> public final class ElasticWriter extends >> PTransform { >> >>private static final Logger log = LoggerFactory.getLogger( >> ElasticWriter.class ); >>private final String elasticurl; >> >>public ElasticWriter( String url ) { >> elasticurl = url; >>} >> >>@Override >>public PDone expand( PCollection input ) { >> input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); >> return PDone.in( input.getPipeline() ); >>} >> >>public static class WriteFn extends DoFn implements >> Serializable { >> >> private transient RestHighLevelClient client; >> private final String elasticurl; >> >> public WriteFn( String elasticurl ) { >>this.elasticurl = elasticurl; >> } >> >> @Setup >> public void setup() { >>log.debug( " into WriteFn::setup" ); >>HttpHost elastic = HttpHost.create( elasticurl ); >>RestClientBuilder bldr = RestClient.builder( elastic ); >> >>// if this is uncommented, the program never exits >>//client = new RestHighLevelClient( bldr.build() ); >> } >> >> @Teardown >> public void teardown() { >>log.debug( " into WriteFn::teardown" ); >>// there's nothing to tear down >> } >> >> @ProcessElement >> public void pe( ProcessContext c ) { >>Document doc = DocumentImpl.from( c.element() ); >>log.debug( "writing {} to elastic", doc.getMetadata().first( >> Metadata.NAME ) ); >> >>// this is where I want to write to ES, but for now, just write >> a text file >> >>ObjectMapper mpr = new ObjectMapper(); >> >>try ( Writer fos = new BufferedWriter( new FileWriter( new File( >> "/tmp/writers", >>doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) >> ) { >> mpr.writeValue( fos, doc ); >>} >>catch ( IOException ioe ) { >> log.error( ioe.getLocalizedMessage(), ioe ); >>} >> } >>} >> } >> >> >> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré >> wrote: >> >>> Hi Ryan, >>> >>> Why don't you use the ElasticsearchIO for that ? >>> >>> Anyway, can you share your pipeline where you have the ParDo calling your >>> DoFn ? >>> >>> Thanks, >>> Regards >>> JB >>> >>> >>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: >>> Hi List, I'm trying to write an updated ElasticSearch client using the newly-published RestHighLevelClient class (with ES 5.6.0). I'm only interested in writes at this time, so I'm using the ElasticsearchIO.write() function as a model. I have a transient member named client. Here's my setup function for my DoFn: @Setup public void setup() { HttpHost elastic = HttpHost.create( elasticurl ); RestClientBuilder bldr = RestClient.builder( elastic ); client = new RestHighLevelClient( bldr.build() ); } If I run the code as shown, I eventually get the debug message: "Pipeline has terminated. Shutting down." but the program never exits. If I comment out the client assignment above, the pipeline behaves normally (but obviously, I can't write anything to ES). Any advice for a dev just getting started with Apache Beam (2.0.0)? ry >>> -- >>> Jean-Baptiste Onofré >>> jbono...@apache.org >>>
Re: ElasticSearch with RestHighLevelClient
Hi Ryan, the last version of ElasticsearchIO (that will be included in Beam 2.2.0) supports Elasticsearch 5.x. The client should be created in the @Setup (or @StartBundle) and release cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement to actually store the elements in the PCollection. Regards JB On 10/23/2017 08:53 PM, Ryan Bobko wrote: Hi JB, Thanks for your input. I'm trying to update ElasticsearchIO, and hopefully learn a bit about Beam in the process. The documentation says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd prefer not to have two ES libs in my classpath if I can avoid it. I'm just getting started, so my pipeline is quite simple: pipeline.apply( "Raw Reader", reader ) // read raw files .apply( "Document Generator", ParDo.of( extractor ) ) // create my document objects for ES insertion .apply( "Elastic Writer", new ElasticWriter( ... ); // upload to ES public final class ElasticWriter extends PTransform{ private static final Logger log = LoggerFactory.getLogger( ElasticWriter.class ); private final String elasticurl; public ElasticWriter( String url ) { elasticurl = url; } @Override public PDone expand( PCollection input ) { input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); return PDone.in( input.getPipeline() ); } public static class WriteFn extends DoFn implements Serializable { private transient RestHighLevelClient client; private final String elasticurl; public WriteFn( String elasticurl ) { this.elasticurl = elasticurl; } @Setup public void setup() { log.debug( " into WriteFn::setup" ); HttpHost elastic = HttpHost.create( elasticurl ); RestClientBuilder bldr = RestClient.builder( elastic ); // if this is uncommented, the program never exits //client = new RestHighLevelClient( bldr.build() ); } @Teardown public void teardown() { log.debug( " into WriteFn::teardown" ); // there's nothing to tear down } @ProcessElement public void pe( ProcessContext c ) { Document doc = DocumentImpl.from( c.element() ); log.debug( "writing {} to elastic", doc.getMetadata().first( Metadata.NAME ) ); // this is where I want to write to ES, but for now, just write a text file ObjectMapper mpr = new ObjectMapper(); try ( Writer fos = new BufferedWriter( new FileWriter( new File( "/tmp/writers", doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) ) { mpr.writeValue( fos, doc ); } catch ( IOException ioe ) { log.error( ioe.getLocalizedMessage(), ioe ); } } } } On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré wrote: Hi Ryan, Why don't you use the ElasticsearchIO for that ? Anyway, can you share your pipeline where you have the ParDo calling your DoFn ? Thanks, Regards JB On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: Hi List, I'm trying to write an updated ElasticSearch client using the newly-published RestHighLevelClient class (with ES 5.6.0). I'm only interested in writes at this time, so I'm using the ElasticsearchIO.write() function as a model. I have a transient member named client. Here's my setup function for my DoFn: @Setup public void setup() { HttpHost elastic = HttpHost.create( elasticurl ); RestClientBuilder bldr = RestClient.builder( elastic ); client = new RestHighLevelClient( bldr.build() ); } If I run the code as shown, I eventually get the debug message: "Pipeline has terminated. Shutting down." but the program never exits. If I comment out the client assignment above, the pipeline behaves normally (but obviously, I can't write anything to ES). Any advice for a dev just getting started with Apache Beam (2.0.0)? ry -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com
Re: ElasticSearch with RestHighLevelClient
Hi JB, Thanks for your input. I'm trying to update ElasticsearchIO, and hopefully learn a bit about Beam in the process. The documentation says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd prefer not to have two ES libs in my classpath if I can avoid it. I'm just getting started, so my pipeline is quite simple: pipeline.apply( "Raw Reader", reader ) // read raw files .apply( "Document Generator", ParDo.of( extractor ) ) // create my document objects for ES insertion .apply( "Elastic Writer", new ElasticWriter( ... ); // upload to ES public final class ElasticWriter extends PTransform{ private static final Logger log = LoggerFactory.getLogger( ElasticWriter.class ); private final String elasticurl; public ElasticWriter( String url ) { elasticurl = url; } @Override public PDone expand( PCollection input ) { input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); return PDone.in( input.getPipeline() ); } public static class WriteFn extends DoFn implements Serializable { private transient RestHighLevelClient client; private final String elasticurl; public WriteFn( String elasticurl ) { this.elasticurl = elasticurl; } @Setup public void setup() { log.debug( " into WriteFn::setup" ); HttpHost elastic = HttpHost.create( elasticurl ); RestClientBuilder bldr = RestClient.builder( elastic ); // if this is uncommented, the program never exits //client = new RestHighLevelClient( bldr.build() ); } @Teardown public void teardown() { log.debug( " into WriteFn::teardown" ); // there's nothing to tear down } @ProcessElement public void pe( ProcessContext c ) { Document doc = DocumentImpl.from( c.element() ); log.debug( "writing {} to elastic", doc.getMetadata().first( Metadata.NAME ) ); // this is where I want to write to ES, but for now, just write a text file ObjectMapper mpr = new ObjectMapper(); try ( Writer fos = new BufferedWriter( new FileWriter( new File( "/tmp/writers", doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) ) { mpr.writeValue( fos, doc ); } catch ( IOException ioe ) { log.error( ioe.getLocalizedMessage(), ioe ); } } } } On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré wrote: > Hi Ryan, > > Why don't you use the ElasticsearchIO for that ? > > Anyway, can you share your pipeline where you have the ParDo calling your > DoFn ? > > Thanks, > Regards > JB > > > On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: >> >> Hi List, >> I'm trying to write an updated ElasticSearch client using the >> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only >> interested in writes at this time, so I'm using the ElasticsearchIO.write() >> function as a model. I have a transient member named client. Here's my setup >> function for my DoFn: >> >> @Setup >> public void setup() { >>HttpHost elastic = HttpHost.create( elasticurl ); >>RestClientBuilder bldr = RestClient.builder( elastic ); >>client = new RestHighLevelClient( bldr.build() ); >> } >> >> If I run the code as shown, I eventually get the debug message: "Pipeline >> has terminated. Shutting down." but the program never exits. If I comment >> out the client assignment above, the pipeline behaves normally (but >> obviously, I can't write anything to ES). >> >> Any advice for a dev just getting started with Apache Beam (2.0.0)? >> >> ry >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com
Re: ElasticSearch with RestHighLevelClient
Hi Ryan, Why don't you use the ElasticsearchIO for that ? Anyway, can you share your pipeline where you have the ParDo calling your DoFn ? Thanks, Regards JB On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: Hi List, I'm trying to write an updated ElasticSearch client using the newly-published RestHighLevelClient class (with ES 5.6.0). I'm only interested in writes at this time, so I'm using the ElasticsearchIO.write() function as a model. I have a transient member named client. Here's my setup function for my DoFn: @Setup public void setup() { HttpHost elastic = HttpHost.create( elasticurl ); RestClientBuilder bldr = RestClient.builder( elastic ); client = new RestHighLevelClient( bldr.build() ); } If I run the code as shown, I eventually get the debug message: "Pipeline has terminated. Shutting down." but the program never exits. If I comment out the client assignment above, the pipeline behaves normally (but obviously, I can't write anything to ES). Any advice for a dev just getting started with Apache Beam (2.0.0)? ry -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com