You don't apply it as part of a PTransform. The class is loaded and
executed dynamically using the JVM's ServiceLoader[1]. So you need to make
sure that the class is on the workers classpath and that your class appears
in one of the META-INF/services files under the JvmInitializer service
(typically contained in the jar file having your class). This isn't a great
example since it is in a test[2] but you should be able to use the same
test logic to try out whether the class would be loaded dynamically.

1: https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html
2:
https://github.com/apache/beam/blob/7c80ecb8c354575e4332f0f1731f1b5a3f0c4362/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/JvmInitializersTest.java#L41

On Sun, May 17, 2020 at 2:59 PM Mohil Khare <[email protected]> wrote:

> Hello Luke,
>
> Thanks for your reply and I apologize for the late reply.
> Well, I already tried using JvmInitializer and for some reason it didn't
> work for me. Quite possible that I was not using it correctly. Do you have
> any code snippets that show how we can use it in a Ptransformation.
>
> My elasticsearch PTransform look like following:
>
> class WriteToElasticSearch extends PTransform<PCollection<MyPojo>, PDone> {
> //    private static final Logger logger = 
> LoggerFactory.getLogger(WriteAppAccessLogToElasticSearch.class);
>     private final String[] elasticsearchEndPoint;
>     private final String username;
>     private final String password;
>
>     WriteToElasticSearch(String[] elasticsearchEndPoint, String username, 
> String password) {
>         this.elasticsearchEndPoint = elasticsearchEndPoint;
>         this.username = username;
>         this.password = password;
>
>     }
>     @Override
>     public PDone expand(PCollection<MyPojo> input) {
>         input
>             .apply("Convert_PCollection<POJO> to PCollection<String>", new 
> MyPojoToString())
>             .apply("Global_Window_Trigger_Write_With_Every_Element",
>                 Window.<String>into(new GlobalWindows())
>                     
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                     .discardingFiredPanes()
>             )
>             .apply("Write_To_Elastic_Search", 
> ElasticsearchIO.write().withConnectionConfiguration(
>                 
> ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint, 
> "index_write", "_doc").withUsername(username).withPassword(password))
>             );
>         return PDone.in(input.getPipeline());
>     }
>
>
> Thanks and Regards
> Mohil
>
> On Thu, Apr 9, 2020 at 2:02 PM Luke Cwik <[email protected]> wrote:
>
>> You should be able to use a JvmInitializer[1] to set any system
>> properties/configure the JVM trust store. Just make sure it's properly set
>> up in your META-INF/services.
>>
>> This is supported by Dataflow and all PortableRunners that use a separate
>> process/container for the worker.
>>
>> 1:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
>>
>> On Thu, Apr 9, 2020 at 10:02 AM Mohil Khare <[email protected]> wrote:
>>
>>> Hi Kenneth,
>>>
>>> Thanks for your reply. You are right, I also believe that this has more
>>> to do with Dataflow than Elasticsearch. I don't think the problem is in
>>> classpath or beam unable to find file in classpath. The problem is how to.
>>> set worker VM's keystore and truststore with self signed root ca. Usually
>>> they contain all trusted root CAs like letsencrypt, symantec etc. KafkaIO
>>> provide that via withConsumerFactorFn(loadKafkaConfig) where you can do
>>> something like following:
>>>
>>> private void loadKafkaConfig(Map<String, Object> config) {
>>>
>>>     readJKSFileFromGCS(this.gcsTruststorePath, 
>>> "/tmp/kafka.client.truststore.jks");
>>>     readJKSFileFromGCS(this.gcsKeystorePath, 
>>> "/tmp/kafka.client.keystore.jks");
>>>
>>>
>>>     
>>> config.put("ssl.truststore.location","/tmp/kafka.client.truststore.jks");
>>>     config.put("ssl.truststore.password","clientsecret");
>>>     config.put("ssl.keystore.location","/tmp/kafka.client.keystore.jks");
>>>     config.put("ssl.keystore.password","clientsecret");
>>>     config.put("ssl.key.password","clientsecret");
>>> }
>>>
>>>
>>> I was wondering if ElasticIO can also provide similar support where we
>>> can provide our self signed root ca.
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>>
>>> On Tue, Apr 7, 2020 at 8:14 AM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> Hi Mohil,
>>>>
>>>> Thanks for the detailed report. I think most people are reduced
>>>> capacity right now. Filing a Jira would be helpful for tracking this.
>>>>
>>>> Since I am writing, I will add a quick guess, but we should move to
>>>> Jira. It seems this has more to do with Dataflow than ElasticSearch. The
>>>> default for staging files is to scan the classpath. To do more, or to fix
>>>> any problem with the autodetection, you will need to specify --filesToStage
>>>> on the command line or setFilesToStage in Java code. Am I correct that this
>>>> symptom is confirmed?
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Apr 6, 2020 at 5:04 PM Mohil Khare <[email protected]> wrote:
>>>>
>>>>> Any update on this? Shall I open a jira for this support ?
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>> On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>> This is Mohil from Prosimo, a small bay area based stealth mode
>>>>>> startup. We use Beam (on version 2.19) with google dataflow in our
>>>>>> analytics pipeline with Kafka and PubSub as source while GCS, BigQuery 
>>>>>> and
>>>>>> ElasticSearch as our sink.
>>>>>>
>>>>>> We want to use our private self signed root ca for tls connections
>>>>>> between our internal services viz kafka, ElasticSearch, beam etc. We are
>>>>>> able to setup secure tls connection between beam and kafka using self
>>>>>> signed root certificate in keystore.jks and truststore.jks and 
>>>>>> transferring
>>>>>> it to worker VMs running kafkaIO using KafkaIO's read via
>>>>>> withConsumerFactorFn().
>>>>>>
>>>>>> We want to do similar things with elasticseachIO where we want to
>>>>>> update its worker VM's truststore with our self signed root certificate 
>>>>>> so
>>>>>> that when elasticsearchIO connects using HTTPS, it can connect 
>>>>>> successfully
>>>>>> without ssl handshake failure. Currently we couldn't find any way to do 
>>>>>> so
>>>>>> with ElasticsearchIO. We tried various possible workarounds like:
>>>>>>
>>>>>> 1. Trying JvmInitializer to initialise Jvm with truststore using
>>>>>> System.setproperty for javax.net.ssl.trustStore,
>>>>>> 2. Transferring our jar to GCP's appengine where we start jar using
>>>>>> Djavax.net.ssl.trustStore and then triggering beam job from there.
>>>>>> 3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't
>>>>>> think it will work because looking at the source code, it looks like it 
>>>>>> has
>>>>>> dependency with keystorePath)
>>>>>>
>>>>>> But nothing worked. When we logged in to worker VMs, it looked like
>>>>>> our trustStore never made it to worker VM. All elasticsearchIO 
>>>>>> connections
>>>>>> failed with the following exception:
>>>>>>
>>>>>> sun.security.validator.ValidatorException: PKIX path building failed:
>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to 
>>>>>> find
>>>>>> valid certification path to requested target
>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>>>>>
>>>>>>
>>>>>> Right now to unblock ourselves, we have added proxy with letsencrypt
>>>>>> root ca between beam and Elasticsearch cluster and beam's elasticsearchIO
>>>>>> connect successfully to proxy using letsencrypt root certificate. We 
>>>>>> won't
>>>>>> want to use Letsencrypt root certificater for internal services as it
>>>>>> expires every three months.  Is there a way, just like kafkaIO, to use
>>>>>> selfsigned root certificate with elasticsearchIO? Or is there a way to
>>>>>> update java cacerts on worker VMs where beam job is running?
>>>>>>
>>>>>> Looking forward for some suggestions soon.
>>>>>>
>>>>>> Thanks and Regards
>>>>>> Mohil Khare
>>>>>>
>>>>>

Reply via email to