You don't apply it as part of a PTransform. The class is loaded and executed dynamically using the JVM's ServiceLoader[1]. So you need to make sure that the class is on the workers classpath and that your class appears in one of the META-INF/services files under the JvmInitializer service (typically contained in the jar file having your class). This isn't a great example since it is in a test[2] but you should be able to use the same test logic to try out whether the class would be loaded dynamically.
1: https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html 2: https://github.com/apache/beam/blob/7c80ecb8c354575e4332f0f1731f1b5a3f0c4362/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/JvmInitializersTest.java#L41 On Sun, May 17, 2020 at 2:59 PM Mohil Khare <[email protected]> wrote: > Hello Luke, > > Thanks for your reply and I apologize for the late reply. > Well, I already tried using JvmInitializer and for some reason it didn't > work for me. Quite possible that I was not using it correctly. Do you have > any code snippets that show how we can use it in a Ptransformation. > > My elasticsearch PTransform look like following: > > class WriteToElasticSearch extends PTransform<PCollection<MyPojo>, PDone> { > // private static final Logger logger = > LoggerFactory.getLogger(WriteAppAccessLogToElasticSearch.class); > private final String[] elasticsearchEndPoint; > private final String username; > private final String password; > > WriteToElasticSearch(String[] elasticsearchEndPoint, String username, > String password) { > this.elasticsearchEndPoint = elasticsearchEndPoint; > this.username = username; > this.password = password; > > } > @Override > public PDone expand(PCollection<MyPojo> input) { > input > .apply("Convert_PCollection<POJO> to PCollection<String>", new > MyPojoToString()) > .apply("Global_Window_Trigger_Write_With_Every_Element", > Window.<String>into(new GlobalWindows()) > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > .discardingFiredPanes() > ) > .apply("Write_To_Elastic_Search", > ElasticsearchIO.write().withConnectionConfiguration( > > ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint, > "index_write", "_doc").withUsername(username).withPassword(password)) > ); > return PDone.in(input.getPipeline()); > } > > > Thanks and Regards > Mohil > > On Thu, Apr 9, 2020 at 2:02 PM Luke Cwik <[email protected]> wrote: > >> You should be able to use a JvmInitializer[1] to set any system >> properties/configure the JVM trust store. Just make sure it's properly set >> up in your META-INF/services. >> >> This is supported by Dataflow and all PortableRunners that use a separate >> process/container for the worker. >> >> 1: >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java >> >> On Thu, Apr 9, 2020 at 10:02 AM Mohil Khare <[email protected]> wrote: >> >>> Hi Kenneth, >>> >>> Thanks for your reply. You are right, I also believe that this has more >>> to do with Dataflow than Elasticsearch. I don't think the problem is in >>> classpath or beam unable to find file in classpath. The problem is how to. >>> set worker VM's keystore and truststore with self signed root ca. Usually >>> they contain all trusted root CAs like letsencrypt, symantec etc. KafkaIO >>> provide that via withConsumerFactorFn(loadKafkaConfig) where you can do >>> something like following: >>> >>> private void loadKafkaConfig(Map<String, Object> config) { >>> >>> readJKSFileFromGCS(this.gcsTruststorePath, >>> "/tmp/kafka.client.truststore.jks"); >>> readJKSFileFromGCS(this.gcsKeystorePath, >>> "/tmp/kafka.client.keystore.jks"); >>> >>> >>> >>> config.put("ssl.truststore.location","/tmp/kafka.client.truststore.jks"); >>> config.put("ssl.truststore.password","clientsecret"); >>> config.put("ssl.keystore.location","/tmp/kafka.client.keystore.jks"); >>> config.put("ssl.keystore.password","clientsecret"); >>> config.put("ssl.key.password","clientsecret"); >>> } >>> >>> >>> I was wondering if ElasticIO can also provide similar support where we >>> can provide our self signed root ca. >>> >>> Thanks and Regards >>> Mohil >>> >>> >>> On Tue, Apr 7, 2020 at 8:14 AM Kenneth Knowles <[email protected]> wrote: >>> >>>> Hi Mohil, >>>> >>>> Thanks for the detailed report. I think most people are reduced >>>> capacity right now. Filing a Jira would be helpful for tracking this. >>>> >>>> Since I am writing, I will add a quick guess, but we should move to >>>> Jira. It seems this has more to do with Dataflow than ElasticSearch. The >>>> default for staging files is to scan the classpath. To do more, or to fix >>>> any problem with the autodetection, you will need to specify --filesToStage >>>> on the command line or setFilesToStage in Java code. Am I correct that this >>>> symptom is confirmed? >>>> >>>> Kenn >>>> >>>> On Mon, Apr 6, 2020 at 5:04 PM Mohil Khare <[email protected]> wrote: >>>> >>>>> Any update on this? Shall I open a jira for this support ? >>>>> >>>>> Thanks and regards >>>>> Mohil >>>>> >>>>> On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare <[email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> This is Mohil from Prosimo, a small bay area based stealth mode >>>>>> startup. We use Beam (on version 2.19) with google dataflow in our >>>>>> analytics pipeline with Kafka and PubSub as source while GCS, BigQuery >>>>>> and >>>>>> ElasticSearch as our sink. >>>>>> >>>>>> We want to use our private self signed root ca for tls connections >>>>>> between our internal services viz kafka, ElasticSearch, beam etc. We are >>>>>> able to setup secure tls connection between beam and kafka using self >>>>>> signed root certificate in keystore.jks and truststore.jks and >>>>>> transferring >>>>>> it to worker VMs running kafkaIO using KafkaIO's read via >>>>>> withConsumerFactorFn(). >>>>>> >>>>>> We want to do similar things with elasticseachIO where we want to >>>>>> update its worker VM's truststore with our self signed root certificate >>>>>> so >>>>>> that when elasticsearchIO connects using HTTPS, it can connect >>>>>> successfully >>>>>> without ssl handshake failure. Currently we couldn't find any way to do >>>>>> so >>>>>> with ElasticsearchIO. We tried various possible workarounds like: >>>>>> >>>>>> 1. Trying JvmInitializer to initialise Jvm with truststore using >>>>>> System.setproperty for javax.net.ssl.trustStore, >>>>>> 2. Transferring our jar to GCP's appengine where we start jar using >>>>>> Djavax.net.ssl.trustStore and then triggering beam job from there. >>>>>> 3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't >>>>>> think it will work because looking at the source code, it looks like it >>>>>> has >>>>>> dependency with keystorePath) >>>>>> >>>>>> But nothing worked. When we logged in to worker VMs, it looked like >>>>>> our trustStore never made it to worker VM. All elasticsearchIO >>>>>> connections >>>>>> failed with the following exception: >>>>>> >>>>>> sun.security.validator.ValidatorException: PKIX path building failed: >>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to >>>>>> find >>>>>> valid certification path to requested target >>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >>>>>> >>>>>> >>>>>> Right now to unblock ourselves, we have added proxy with letsencrypt >>>>>> root ca between beam and Elasticsearch cluster and beam's elasticsearchIO >>>>>> connect successfully to proxy using letsencrypt root certificate. We >>>>>> won't >>>>>> want to use Letsencrypt root certificater for internal services as it >>>>>> expires every three months. Is there a way, just like kafkaIO, to use >>>>>> selfsigned root certificate with elasticsearchIO? Or is there a way to >>>>>> update java cacerts on worker VMs where beam job is running? >>>>>> >>>>>> Looking forward for some suggestions soon. >>>>>> >>>>>> Thanks and Regards >>>>>> Mohil Khare >>>>>> >>>>>
