Hello Luke,

Thanks for your reply and I apologize for the late reply.
Well, I already tried using JvmInitializer and for some reason it didn't
work for me. Quite possible that I was not using it correctly. Do you have
any code snippets that show how we can use it in a Ptransformation.

My elasticsearch PTransform look like following:

class WriteToElasticSearch extends PTransform<PCollection<MyPojo>, PDone> {
//    private static final Logger logger =
LoggerFactory.getLogger(WriteAppAccessLogToElasticSearch.class);
    private final String[] elasticsearchEndPoint;
    private final String username;
    private final String password;

    WriteToElasticSearch(String[] elasticsearchEndPoint, String
username, String password) {
        this.elasticsearchEndPoint = elasticsearchEndPoint;
        this.username = username;
        this.password = password;

    }
    @Override
    public PDone expand(PCollection<MyPojo> input) {
        input
            .apply("Convert_PCollection<POJO> to PCollection<String>",
new MyPojoToString())
            .apply("Global_Window_Trigger_Write_With_Every_Element",
                Window.<String>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .discardingFiredPanes()
            )
            .apply("Write_To_Elastic_Search",
ElasticsearchIO.write().withConnectionConfiguration(

ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint,
"index_write", "_doc").withUsername(username).withPassword(password))
            );
        return PDone.in(input.getPipeline());
    }


Thanks and Regards
Mohil

On Thu, Apr 9, 2020 at 2:02 PM Luke Cwik <[email protected]> wrote:

> You should be able to use a JvmInitializer[1] to set any system
> properties/configure the JVM trust store. Just make sure it's properly set
> up in your META-INF/services.
>
> This is supported by Dataflow and all PortableRunners that use a separate
> process/container for the worker.
>
> 1:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
>
> On Thu, Apr 9, 2020 at 10:02 AM Mohil Khare <[email protected]> wrote:
>
>> Hi Kenneth,
>>
>> Thanks for your reply. You are right, I also believe that this has more
>> to do with Dataflow than Elasticsearch. I don't think the problem is in
>> classpath or beam unable to find file in classpath. The problem is how to.
>> set worker VM's keystore and truststore with self signed root ca. Usually
>> they contain all trusted root CAs like letsencrypt, symantec etc. KafkaIO
>> provide that via withConsumerFactorFn(loadKafkaConfig) where you can do
>> something like following:
>>
>> private void loadKafkaConfig(Map<String, Object> config) {
>>
>>     readJKSFileFromGCS(this.gcsTruststorePath, 
>> "/tmp/kafka.client.truststore.jks");
>>     readJKSFileFromGCS(this.gcsKeystorePath, 
>> "/tmp/kafka.client.keystore.jks");
>>
>>
>>     config.put("ssl.truststore.location","/tmp/kafka.client.truststore.jks");
>>     config.put("ssl.truststore.password","clientsecret");
>>     config.put("ssl.keystore.location","/tmp/kafka.client.keystore.jks");
>>     config.put("ssl.keystore.password","clientsecret");
>>     config.put("ssl.key.password","clientsecret");
>> }
>>
>>
>> I was wondering if ElasticIO can also provide similar support where we
>> can provide our self signed root ca.
>>
>> Thanks and Regards
>> Mohil
>>
>>
>> On Tue, Apr 7, 2020 at 8:14 AM Kenneth Knowles <[email protected]> wrote:
>>
>>> Hi Mohil,
>>>
>>> Thanks for the detailed report. I think most people are reduced capacity
>>> right now. Filing a Jira would be helpful for tracking this.
>>>
>>> Since I am writing, I will add a quick guess, but we should move to
>>> Jira. It seems this has more to do with Dataflow than ElasticSearch. The
>>> default for staging files is to scan the classpath. To do more, or to fix
>>> any problem with the autodetection, you will need to specify --filesToStage
>>> on the command line or setFilesToStage in Java code. Am I correct that this
>>> symptom is confirmed?
>>>
>>> Kenn
>>>
>>> On Mon, Apr 6, 2020 at 5:04 PM Mohil Khare <[email protected]> wrote:
>>>
>>>> Any update on this? Shall I open a jira for this support ?
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>> This is Mohil from Prosimo, a small bay area based stealth mode
>>>>> startup. We use Beam (on version 2.19) with google dataflow in our
>>>>> analytics pipeline with Kafka and PubSub as source while GCS, BigQuery and
>>>>> ElasticSearch as our sink.
>>>>>
>>>>> We want to use our private self signed root ca for tls connections
>>>>> between our internal services viz kafka, ElasticSearch, beam etc. We are
>>>>> able to setup secure tls connection between beam and kafka using self
>>>>> signed root certificate in keystore.jks and truststore.jks and 
>>>>> transferring
>>>>> it to worker VMs running kafkaIO using KafkaIO's read via
>>>>> withConsumerFactorFn().
>>>>>
>>>>> We want to do similar things with elasticseachIO where we want to
>>>>> update its worker VM's truststore with our self signed root certificate so
>>>>> that when elasticsearchIO connects using HTTPS, it can connect 
>>>>> successfully
>>>>> without ssl handshake failure. Currently we couldn't find any way to do so
>>>>> with ElasticsearchIO. We tried various possible workarounds like:
>>>>>
>>>>> 1. Trying JvmInitializer to initialise Jvm with truststore using
>>>>> System.setproperty for javax.net.ssl.trustStore,
>>>>> 2. Transferring our jar to GCP's appengine where we start jar using
>>>>> Djavax.net.ssl.trustStore and then triggering beam job from there.
>>>>> 3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't
>>>>> think it will work because looking at the source code, it looks like it 
>>>>> has
>>>>> dependency with keystorePath)
>>>>>
>>>>> But nothing worked. When we logged in to worker VMs, it looked like
>>>>> our trustStore never made it to worker VM. All elasticsearchIO connections
>>>>> failed with the following exception:
>>>>>
>>>>> sun.security.validator.ValidatorException: PKIX path building failed:
>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
>>>>> valid certification path to requested target
>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>>>>
>>>>>
>>>>> Right now to unblock ourselves, we have added proxy with letsencrypt
>>>>> root ca between beam and Elasticsearch cluster and beam's elasticsearchIO
>>>>> connect successfully to proxy using letsencrypt root certificate. We won't
>>>>> want to use Letsencrypt root certificater for internal services as it
>>>>> expires every three months.  Is there a way, just like kafkaIO, to use
>>>>> selfsigned root certificate with elasticsearchIO? Or is there a way to
>>>>> update java cacerts on worker VMs where beam job is running?
>>>>>
>>>>> Looking forward for some suggestions soon.
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil Khare
>>>>>
>>>>

Reply via email to