Hi Etienne,
That was just the hint I needed! Until Beam 2.2 is released, my
solution is to keep a handle to the low level client, and close that
in the teardown method. This seems to solve the threading issue.

ry

On Wed, Oct 25, 2017 at 6:09 AM, Etienne Chauchot <echauc...@gmail.com> wrote:
> Hi,
>
> We have used the low level rest client for the ElasticsearchIO despite the
> fact that there is now the high level client for maintenance reasons.
> Indeed, the IO production code is common to all Elasticsearch versions
> because the low level Rest client is compatible with all ES versions. This
> is not the case with the high level client which major version has to be the
> same as the targeted ES major version which will lead to having different IO
> versions for different ES versions.
>
> That is why IMHO it is not a good idea to use the high level Rest client in
> the ElasticsearchIO.
>
> Regards,
>
> Etienne Chauchot
>
>
>
> Le 23/10/2017 à 23:21, Ryan Bobko a écrit :
>>
>> Thanks Tim,
>> I believe I'm doing what Jean-Baptiste recommends, so I guess I'll
>> have a look at the snapshot and see what's different. I don't mind
>> waiting a bit if it means I don't have to duplicate working code.
>>
>> ry
>>
>> On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson
>> <timrobertson...@gmail.com> wrote:
>>>
>>> Hi Ryan,
>>>
>>> I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am
>>> told
>>> 2.2.0 is expected within a couple weeks.
>>> My work is only a proof of concept for now, but I put in 300M fairly
>>> small
>>> docs at around 100,000/sec on a 3 node cluster without any issue [1].
>>>
>>> Hope this helps,
>>> Tim
>>>
>>>
>>> [1]
>>>
>>> https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java
>>>
>>>
>>> On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>> wrote:
>>>>
>>>> Hi Ryan,
>>>>
>>>> the last version of ElasticsearchIO (that will be included in Beam
>>>> 2.2.0)
>>>> supports Elasticsearch 5.x.
>>>>
>>>> The client should be created in the @Setup (or @StartBundle) and release
>>>> cleanly in @Teardown (or @FinishBundle). Then, it's used in
>>>> @ProcessElement
>>>> to actually store the elements in the PCollection.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>>
>>>> On 10/23/2017 08:53 PM, Ryan Bobko wrote:
>>>>>
>>>>> Hi JB,
>>>>> Thanks for your input. I'm trying to update ElasticsearchIO, and
>>>>> hopefully learn a bit about Beam in the process. The documentation
>>>>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
>>>>> prefer not to have two ES libs in my classpath if I can avoid it. I'm
>>>>> just getting started, so my pipeline is quite simple:
>>>>>
>>>>> pipeline.apply( "Raw Reader", reader ) // read raw files
>>>>>               .apply( "Document Generator", ParDo.of( extractor ) ) //
>>>>> create my document objects for ES insertion
>>>>>               .apply( "Elastic Writer", new ElasticWriter( ... ); //
>>>>> upload to ES
>>>>>
>>>>>
>>>>> public final class ElasticWriter extends
>>>>> PTransform<PCollection<Document>, PDone> {
>>>>>
>>>>>     private static final Logger log = LoggerFactory.getLogger(
>>>>> ElasticWriter.class );
>>>>>     private final String elasticurl;
>>>>>
>>>>>     public ElasticWriter( String url ) {
>>>>>       elasticurl = url;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public PDone expand( PCollection<Document> input ) {
>>>>>       input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
>>>>>       return PDone.in( input.getPipeline() );
>>>>>     }
>>>>>
>>>>>     public static class WriteFn extends DoFn<Document, Void> implements
>>>>> Serializable {
>>>>>
>>>>>       private transient RestHighLevelClient client;
>>>>>       private final String elasticurl;
>>>>>
>>>>>       public WriteFn( String elasticurl ) {
>>>>>         this.elasticurl = elasticurl;
>>>>>       }
>>>>>
>>>>>       @Setup
>>>>>       public void setup() {
>>>>>         log.debug( "******************** into WriteFn::setup" );
>>>>>         HttpHost elastic = HttpHost.create( elasticurl );
>>>>>         RestClientBuilder bldr = RestClient.builder( elastic );
>>>>>
>>>>>         // if this is uncommented, the program never exits
>>>>>         //client = new RestHighLevelClient( bldr.build() );
>>>>>       }
>>>>>
>>>>>       @Teardown
>>>>>       public void teardown() {
>>>>>         log.debug( "******************** into WriteFn::teardown" );
>>>>>         // there's nothing to tear down
>>>>>       }
>>>>>
>>>>>       @ProcessElement
>>>>>       public void pe( ProcessContext c ) {
>>>>>         Document doc = DocumentImpl.from( c.element() );
>>>>>         log.debug( "writing {} to elastic", doc.getMetadata().first(
>>>>> Metadata.NAME ) );
>>>>>
>>>>>         // this is where I want to write to ES, but for now, just write
>>>>> a text file
>>>>>
>>>>>         ObjectMapper mpr = new ObjectMapper();
>>>>>
>>>>>         try ( Writer fos = new BufferedWriter( new FileWriter( new
>>>>> File(
>>>>> "/tmp/writers",
>>>>>                 doc.getMetadata().first( Metadata.NAME ).asString() ) )
>>>>> )
>>>>> ) {
>>>>>           mpr.writeValue( fos, doc );
>>>>>         }
>>>>>         catch ( IOException ioe ) {
>>>>>           log.error( ioe.getLocalizedMessage(), ioe );
>>>>>         }
>>>>>       }
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>> wrote:
>>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> Why don't you use the ElasticsearchIO for that ?
>>>>>>
>>>>>> Anyway, can you share your pipeline where you have the ParDo calling
>>>>>> your
>>>>>> DoFn ?
>>>>>>
>>>>>> Thanks,
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>>
>>>>>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:
>>>>>>>
>>>>>>>
>>>>>>> Hi List,
>>>>>>> I'm trying to write an updated ElasticSearch client using the
>>>>>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
>>>>>>> interested in writes at this time, so I'm using the
>>>>>>> ElasticsearchIO.write()
>>>>>>> function as a model. I have a transient member named client. Here's
>>>>>>> my
>>>>>>> setup
>>>>>>> function for my DoFn:
>>>>>>>
>>>>>>> @Setup
>>>>>>> public void setup() {
>>>>>>>      HttpHost elastic = HttpHost.create( elasticurl );
>>>>>>>      RestClientBuilder bldr = RestClient.builder( elastic );
>>>>>>>      client = new RestHighLevelClient( bldr.build() );
>>>>>>> }
>>>>>>>
>>>>>>> If I run the code as shown, I eventually get the debug message:
>>>>>>> "Pipeline
>>>>>>> has terminated. Shutting down." but the program never exits. If I
>>>>>>> comment
>>>>>>> out the client assignment above, the pipeline behaves normally (but
>>>>>>> obviously, I can't write anything to ES).
>>>>>>>
>>>>>>> Any advice for a dev just getting started with Apache Beam (2.0.0)?
>>>>>>>
>>>>>>> ry
>>>>>>>
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> jbono...@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>
>>>
>

Reply via email to