Hi Ryan,

I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am told
2.2.0 is expected within a couple weeks.
My work is only a proof of concept for now, but I put in 300M fairly small
docs at around 100,000/sec on a 3 node cluster without any issue [1].

Hope this helps,
Tim


[1]
https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java


On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Ryan,
>
> the last version of ElasticsearchIO (that will be included in Beam 2.2.0)
> supports Elasticsearch 5.x.
>
> The client should be created in the @Setup (or @StartBundle) and release
> cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement
> to actually store the elements in the PCollection.
>
> Regards
> JB
>
>
> On 10/23/2017 08:53 PM, Ryan Bobko wrote:
>
>> Hi JB,
>> Thanks for your input. I'm trying to update ElasticsearchIO, and
>> hopefully learn a bit about Beam in the process. The documentation
>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
>> prefer not to have two ES libs in my classpath if I can avoid it. I'm
>> just getting started, so my pipeline is quite simple:
>>
>> pipeline.apply( "Raw Reader", reader ) // read raw files
>>              .apply( "Document Generator", ParDo.of( extractor ) ) //
>> create my document objects for ES insertion
>>              .apply( "Elastic Writer", new ElasticWriter( ... ); //
>> upload to ES
>>
>>
>> public final class ElasticWriter extends
>> PTransform<PCollection<Document>, PDone> {
>>
>>    private static final Logger log = LoggerFactory.getLogger(
>> ElasticWriter.class );
>>    private final String elasticurl;
>>
>>    public ElasticWriter( String url ) {
>>      elasticurl = url;
>>    }
>>
>>    @Override
>>    public PDone expand( PCollection<Document> input ) {
>>      input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
>>      return PDone.in( input.getPipeline() );
>>    }
>>
>>    public static class WriteFn extends DoFn<Document, Void> implements
>> Serializable {
>>
>>      private transient RestHighLevelClient client;
>>      private final String elasticurl;
>>
>>      public WriteFn( String elasticurl ) {
>>        this.elasticurl = elasticurl;
>>      }
>>
>>      @Setup
>>      public void setup() {
>>        log.debug( "******************** into WriteFn::setup" );
>>        HttpHost elastic = HttpHost.create( elasticurl );
>>        RestClientBuilder bldr = RestClient.builder( elastic );
>>
>>        // if this is uncommented, the program never exits
>>        //client = new RestHighLevelClient( bldr.build() );
>>      }
>>
>>      @Teardown
>>      public void teardown() {
>>        log.debug( "******************** into WriteFn::teardown" );
>>        // there's nothing to tear down
>>      }
>>
>>      @ProcessElement
>>      public void pe( ProcessContext c ) {
>>        Document doc = DocumentImpl.from( c.element() );
>>        log.debug( "writing {} to elastic", doc.getMetadata().first(
>> Metadata.NAME ) );
>>
>>        // this is where I want to write to ES, but for now, just write
>> a text file
>>
>>        ObjectMapper mpr = new ObjectMapper();
>>
>>        try ( Writer fos = new BufferedWriter( new FileWriter( new File(
>> "/tmp/writers",
>>                doc.getMetadata().first( Metadata.NAME ).asString() ) ) )
>> ) {
>>          mpr.writeValue( fos, doc );
>>        }
>>        catch ( IOException ioe ) {
>>          log.error( ioe.getLocalizedMessage(), ioe );
>>        }
>>      }
>>    }
>> }
>>
>>
>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> Why don't you use the ElasticsearchIO for that ?
>>>
>>> Anyway, can you share your pipeline where you have the ParDo calling your
>>> DoFn ?
>>>
>>> Thanks,
>>> Regards
>>> JB
>>>
>>>
>>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:
>>>
>>>>
>>>> Hi List,
>>>> I'm trying to write an updated ElasticSearch client using the
>>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
>>>> interested in writes at this time, so I'm using the
>>>> ElasticsearchIO.write()
>>>> function as a model. I have a transient member named client. Here's my
>>>> setup
>>>> function for my DoFn:
>>>>
>>>> @Setup
>>>> public void setup() {
>>>>     HttpHost elastic = HttpHost.create( elasticurl );
>>>>     RestClientBuilder bldr = RestClient.builder( elastic );
>>>>     client = new RestHighLevelClient( bldr.build() );
>>>> }
>>>>
>>>> If I run the code as shown, I eventually get the debug message:
>>>> "Pipeline
>>>> has terminated. Shutting down." but the program never exits. If I
>>>> comment
>>>> out the client assignment above, the pipeline behaves normally (but
>>>> obviously, I can't write anything to ES).
>>>>
>>>> Any advice for a dev just getting started with Apache Beam (2.0.0)?
>>>>
>>>> ry
>>>>
>>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to