Hi Ryan, I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am told 2.2.0 is expected within a couple weeks. My work is only a proof of concept for now, but I put in 300M fairly small docs at around 100,000/sec on a 3 node cluster without any issue [1].
Hope this helps, Tim [1] https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Ryan, > > the last version of ElasticsearchIO (that will be included in Beam 2.2.0) > supports Elasticsearch 5.x. > > The client should be created in the @Setup (or @StartBundle) and release > cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement > to actually store the elements in the PCollection. > > Regards > JB > > > On 10/23/2017 08:53 PM, Ryan Bobko wrote: > >> Hi JB, >> Thanks for your input. I'm trying to update ElasticsearchIO, and >> hopefully learn a bit about Beam in the process. The documentation >> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd >> prefer not to have two ES libs in my classpath if I can avoid it. I'm >> just getting started, so my pipeline is quite simple: >> >> pipeline.apply( "Raw Reader", reader ) // read raw files >> .apply( "Document Generator", ParDo.of( extractor ) ) // >> create my document objects for ES insertion >> .apply( "Elastic Writer", new ElasticWriter( ... ); // >> upload to ES >> >> >> public final class ElasticWriter extends >> PTransform<PCollection<Document>, PDone> { >> >> private static final Logger log = LoggerFactory.getLogger( >> ElasticWriter.class ); >> private final String elasticurl; >> >> public ElasticWriter( String url ) { >> elasticurl = url; >> } >> >> @Override >> public PDone expand( PCollection<Document> input ) { >> input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); >> return PDone.in( input.getPipeline() ); >> } >> >> public static class WriteFn extends DoFn<Document, Void> implements >> Serializable { >> >> private transient RestHighLevelClient client; >> private final String elasticurl; >> >> public WriteFn( String elasticurl ) { >> this.elasticurl = elasticurl; >> } >> >> @Setup >> public void setup() { >> log.debug( "******************** into WriteFn::setup" ); >> HttpHost elastic = HttpHost.create( elasticurl ); >> RestClientBuilder bldr = RestClient.builder( elastic ); >> >> // if this is uncommented, the program never exits >> //client = new RestHighLevelClient( bldr.build() ); >> } >> >> @Teardown >> public void teardown() { >> log.debug( "******************** into WriteFn::teardown" ); >> // there's nothing to tear down >> } >> >> @ProcessElement >> public void pe( ProcessContext c ) { >> Document doc = DocumentImpl.from( c.element() ); >> log.debug( "writing {} to elastic", doc.getMetadata().first( >> Metadata.NAME ) ); >> >> // this is where I want to write to ES, but for now, just write >> a text file >> >> ObjectMapper mpr = new ObjectMapper(); >> >> try ( Writer fos = new BufferedWriter( new FileWriter( new File( >> "/tmp/writers", >> doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) >> ) { >> mpr.writeValue( fos, doc ); >> } >> catch ( IOException ioe ) { >> log.error( ioe.getLocalizedMessage(), ioe ); >> } >> } >> } >> } >> >> >> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >> wrote: >> >>> Hi Ryan, >>> >>> Why don't you use the ElasticsearchIO for that ? >>> >>> Anyway, can you share your pipeline where you have the ParDo calling your >>> DoFn ? >>> >>> Thanks, >>> Regards >>> JB >>> >>> >>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: >>> >>>> >>>> Hi List, >>>> I'm trying to write an updated ElasticSearch client using the >>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only >>>> interested in writes at this time, so I'm using the >>>> ElasticsearchIO.write() >>>> function as a model. I have a transient member named client. Here's my >>>> setup >>>> function for my DoFn: >>>> >>>> @Setup >>>> public void setup() { >>>> HttpHost elastic = HttpHost.create( elasticurl ); >>>> RestClientBuilder bldr = RestClient.builder( elastic ); >>>> client = new RestHighLevelClient( bldr.build() ); >>>> } >>>> >>>> If I run the code as shown, I eventually get the debug message: >>>> "Pipeline >>>> has terminated. Shutting down." but the program never exits. If I >>>> comment >>>> out the client assignment above, the pipeline behaves normally (but >>>> obviously, I can't write anything to ES). >>>> >>>> Any advice for a dev just getting started with Apache Beam (2.0.0)? >>>> >>>> ry >>>> >>>> >>> -- >>> Jean-Baptiste Onofré >>> jbono...@apache.org >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >> > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >