Hi Etienne, That was just the hint I needed! Until Beam 2.2 is released, my solution is to keep a handle to the low level client, and close that in the teardown method. This seems to solve the threading issue.
ry On Wed, Oct 25, 2017 at 6:09 AM, Etienne Chauchot <echauc...@gmail.com> wrote: > Hi, > > We have used the low level rest client for the ElasticsearchIO despite the > fact that there is now the high level client for maintenance reasons. > Indeed, the IO production code is common to all Elasticsearch versions > because the low level Rest client is compatible with all ES versions. This > is not the case with the high level client which major version has to be the > same as the targeted ES major version which will lead to having different IO > versions for different ES versions. > > That is why IMHO it is not a good idea to use the high level Rest client in > the ElasticsearchIO. > > Regards, > > Etienne Chauchot > > > > Le 23/10/2017 à 23:21, Ryan Bobko a écrit : >> >> Thanks Tim, >> I believe I'm doing what Jean-Baptiste recommends, so I guess I'll >> have a look at the snapshot and see what's different. I don't mind >> waiting a bit if it means I don't have to duplicate working code. >> >> ry >> >> On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson >> <timrobertson...@gmail.com> wrote: >>> >>> Hi Ryan, >>> >>> I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am >>> told >>> 2.2.0 is expected within a couple weeks. >>> My work is only a proof of concept for now, but I put in 300M fairly >>> small >>> docs at around 100,000/sec on a 3 node cluster without any issue [1]. >>> >>> Hope this helps, >>> Tim >>> >>> >>> [1] >>> >>> https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java >>> >>> >>> On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >>> wrote: >>>> >>>> Hi Ryan, >>>> >>>> the last version of ElasticsearchIO (that will be included in Beam >>>> 2.2.0) >>>> supports Elasticsearch 5.x. >>>> >>>> The client should be created in the @Setup (or @StartBundle) and release >>>> cleanly in @Teardown (or @FinishBundle). Then, it's used in >>>> @ProcessElement >>>> to actually store the elements in the PCollection. >>>> >>>> Regards >>>> JB >>>> >>>> >>>> On 10/23/2017 08:53 PM, Ryan Bobko wrote: >>>>> >>>>> Hi JB, >>>>> Thanks for your input. I'm trying to update ElasticsearchIO, and >>>>> hopefully learn a bit about Beam in the process. The documentation >>>>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd >>>>> prefer not to have two ES libs in my classpath if I can avoid it. I'm >>>>> just getting started, so my pipeline is quite simple: >>>>> >>>>> pipeline.apply( "Raw Reader", reader ) // read raw files >>>>> .apply( "Document Generator", ParDo.of( extractor ) ) // >>>>> create my document objects for ES insertion >>>>> .apply( "Elastic Writer", new ElasticWriter( ... ); // >>>>> upload to ES >>>>> >>>>> >>>>> public final class ElasticWriter extends >>>>> PTransform<PCollection<Document>, PDone> { >>>>> >>>>> private static final Logger log = LoggerFactory.getLogger( >>>>> ElasticWriter.class ); >>>>> private final String elasticurl; >>>>> >>>>> public ElasticWriter( String url ) { >>>>> elasticurl = url; >>>>> } >>>>> >>>>> @Override >>>>> public PDone expand( PCollection<Document> input ) { >>>>> input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); >>>>> return PDone.in( input.getPipeline() ); >>>>> } >>>>> >>>>> public static class WriteFn extends DoFn<Document, Void> implements >>>>> Serializable { >>>>> >>>>> private transient RestHighLevelClient client; >>>>> private final String elasticurl; >>>>> >>>>> public WriteFn( String elasticurl ) { >>>>> this.elasticurl = elasticurl; >>>>> } >>>>> >>>>> @Setup >>>>> public void setup() { >>>>> log.debug( "******************** into WriteFn::setup" ); >>>>> HttpHost elastic = HttpHost.create( elasticurl ); >>>>> RestClientBuilder bldr = RestClient.builder( elastic ); >>>>> >>>>> // if this is uncommented, the program never exits >>>>> //client = new RestHighLevelClient( bldr.build() ); >>>>> } >>>>> >>>>> @Teardown >>>>> public void teardown() { >>>>> log.debug( "******************** into WriteFn::teardown" ); >>>>> // there's nothing to tear down >>>>> } >>>>> >>>>> @ProcessElement >>>>> public void pe( ProcessContext c ) { >>>>> Document doc = DocumentImpl.from( c.element() ); >>>>> log.debug( "writing {} to elastic", doc.getMetadata().first( >>>>> Metadata.NAME ) ); >>>>> >>>>> // this is where I want to write to ES, but for now, just write >>>>> a text file >>>>> >>>>> ObjectMapper mpr = new ObjectMapper(); >>>>> >>>>> try ( Writer fos = new BufferedWriter( new FileWriter( new >>>>> File( >>>>> "/tmp/writers", >>>>> doc.getMetadata().first( Metadata.NAME ).asString() ) ) >>>>> ) >>>>> ) { >>>>> mpr.writeValue( fos, doc ); >>>>> } >>>>> catch ( IOException ioe ) { >>>>> log.error( ioe.getLocalizedMessage(), ioe ); >>>>> } >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >>>>> wrote: >>>>>> >>>>>> Hi Ryan, >>>>>> >>>>>> Why don't you use the ElasticsearchIO for that ? >>>>>> >>>>>> Anyway, can you share your pipeline where you have the ParDo calling >>>>>> your >>>>>> DoFn ? >>>>>> >>>>>> Thanks, >>>>>> Regards >>>>>> JB >>>>>> >>>>>> >>>>>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: >>>>>>> >>>>>>> >>>>>>> Hi List, >>>>>>> I'm trying to write an updated ElasticSearch client using the >>>>>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only >>>>>>> interested in writes at this time, so I'm using the >>>>>>> ElasticsearchIO.write() >>>>>>> function as a model. I have a transient member named client. Here's >>>>>>> my >>>>>>> setup >>>>>>> function for my DoFn: >>>>>>> >>>>>>> @Setup >>>>>>> public void setup() { >>>>>>> HttpHost elastic = HttpHost.create( elasticurl ); >>>>>>> RestClientBuilder bldr = RestClient.builder( elastic ); >>>>>>> client = new RestHighLevelClient( bldr.build() ); >>>>>>> } >>>>>>> >>>>>>> If I run the code as shown, I eventually get the debug message: >>>>>>> "Pipeline >>>>>>> has terminated. Shutting down." but the program never exits. If I >>>>>>> comment >>>>>>> out the client assignment above, the pipeline behaves normally (but >>>>>>> obviously, I can't write anything to ES). >>>>>>> >>>>>>> Any advice for a dev just getting started with Apache Beam (2.0.0)? >>>>>>> >>>>>>> ry >>>>>>> >>>>>> -- >>>>>> Jean-Baptiste Onofré >>>>>> jbono...@apache.org >>>>>> http://blog.nanthrax.net >>>>>> Talend - http://www.talend.com >>>> >>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> jbono...@apache.org >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >>> >>> >