Thanks Tim, I believe I'm doing what Jean-Baptiste recommends, so I guess I'll have a look at the snapshot and see what's different. I don't mind waiting a bit if it means I don't have to duplicate working code.
ry On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson <timrobertson...@gmail.com> wrote: > Hi Ryan, > > I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am told > 2.2.0 is expected within a couple weeks. > My work is only a proof of concept for now, but I put in 300M fairly small > docs at around 100,000/sec on a 3 node cluster without any issue [1]. > > Hope this helps, > Tim > > > [1] > https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java > > > On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: >> >> Hi Ryan, >> >> the last version of ElasticsearchIO (that will be included in Beam 2.2.0) >> supports Elasticsearch 5.x. >> >> The client should be created in the @Setup (or @StartBundle) and release >> cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement >> to actually store the elements in the PCollection. >> >> Regards >> JB >> >> >> On 10/23/2017 08:53 PM, Ryan Bobko wrote: >>> >>> Hi JB, >>> Thanks for your input. I'm trying to update ElasticsearchIO, and >>> hopefully learn a bit about Beam in the process. The documentation >>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd >>> prefer not to have two ES libs in my classpath if I can avoid it. I'm >>> just getting started, so my pipeline is quite simple: >>> >>> pipeline.apply( "Raw Reader", reader ) // read raw files >>> .apply( "Document Generator", ParDo.of( extractor ) ) // >>> create my document objects for ES insertion >>> .apply( "Elastic Writer", new ElasticWriter( ... ); // >>> upload to ES >>> >>> >>> public final class ElasticWriter extends >>> PTransform<PCollection<Document>, PDone> { >>> >>> private static final Logger log = LoggerFactory.getLogger( >>> ElasticWriter.class ); >>> private final String elasticurl; >>> >>> public ElasticWriter( String url ) { >>> elasticurl = url; >>> } >>> >>> @Override >>> public PDone expand( PCollection<Document> input ) { >>> input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); >>> return PDone.in( input.getPipeline() ); >>> } >>> >>> public static class WriteFn extends DoFn<Document, Void> implements >>> Serializable { >>> >>> private transient RestHighLevelClient client; >>> private final String elasticurl; >>> >>> public WriteFn( String elasticurl ) { >>> this.elasticurl = elasticurl; >>> } >>> >>> @Setup >>> public void setup() { >>> log.debug( "******************** into WriteFn::setup" ); >>> HttpHost elastic = HttpHost.create( elasticurl ); >>> RestClientBuilder bldr = RestClient.builder( elastic ); >>> >>> // if this is uncommented, the program never exits >>> //client = new RestHighLevelClient( bldr.build() ); >>> } >>> >>> @Teardown >>> public void teardown() { >>> log.debug( "******************** into WriteFn::teardown" ); >>> // there's nothing to tear down >>> } >>> >>> @ProcessElement >>> public void pe( ProcessContext c ) { >>> Document doc = DocumentImpl.from( c.element() ); >>> log.debug( "writing {} to elastic", doc.getMetadata().first( >>> Metadata.NAME ) ); >>> >>> // this is where I want to write to ES, but for now, just write >>> a text file >>> >>> ObjectMapper mpr = new ObjectMapper(); >>> >>> try ( Writer fos = new BufferedWriter( new FileWriter( new File( >>> "/tmp/writers", >>> doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) >>> ) { >>> mpr.writeValue( fos, doc ); >>> } >>> catch ( IOException ioe ) { >>> log.error( ioe.getLocalizedMessage(), ioe ); >>> } >>> } >>> } >>> } >>> >>> >>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >>> wrote: >>>> >>>> Hi Ryan, >>>> >>>> Why don't you use the ElasticsearchIO for that ? >>>> >>>> Anyway, can you share your pipeline where you have the ParDo calling >>>> your >>>> DoFn ? >>>> >>>> Thanks, >>>> Regards >>>> JB >>>> >>>> >>>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: >>>>> >>>>> >>>>> Hi List, >>>>> I'm trying to write an updated ElasticSearch client using the >>>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only >>>>> interested in writes at this time, so I'm using the >>>>> ElasticsearchIO.write() >>>>> function as a model. I have a transient member named client. Here's my >>>>> setup >>>>> function for my DoFn: >>>>> >>>>> @Setup >>>>> public void setup() { >>>>> HttpHost elastic = HttpHost.create( elasticurl ); >>>>> RestClientBuilder bldr = RestClient.builder( elastic ); >>>>> client = new RestHighLevelClient( bldr.build() ); >>>>> } >>>>> >>>>> If I run the code as shown, I eventually get the debug message: >>>>> "Pipeline >>>>> has terminated. Shutting down." but the program never exits. If I >>>>> comment >>>>> out the client assignment above, the pipeline behaves normally (but >>>>> obviously, I can't write anything to ES). >>>>> >>>>> Any advice for a dev just getting started with Apache Beam (2.0.0)? >>>>> >>>>> ry >>>>> >>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> jbono...@apache.org >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >> >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com > >