Thanks. Did you use this configurations in your Apache Beam application?
Does Apache Beam use Confluent Schema Registry client internally ?

Yohei Onishi


On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <[email protected]> wrote:

> Hi Yohei,
>
> I had tried some time back with direct-runner and faced the same issue as
> in https://github.com/confluentinc/schema-registry/issues/943 when
> interacting with TLS enabled Kafka and SchemaRegistry.
>
> So I had set the environment variable JAVA_TOOL_OPTIONS with the required
> properties and then it worked.
>
> export
> JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks
> -Djavax.net.ssl.trustStorePassword="client-password"
> -Djavax.net.ssl.keyStore=client-keystore.jks
> -Djavax.net.ssl.keyStorePassword=client-password"
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <[email protected]> wrote:
>
>> Thanks but I already did it as mentioned.
>> It seems that Apache Beam does not support Schema Registry. I will post
>> about it after I confirmed.
>>
>> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <[email protected]> wrote:
>>
>>> I replied to the SO question with more details.
>>>
>>> The issue is that your trying to load a truststore (file) on the VM
>>> which doesn't exist. You need to make that file accessible in some way. The
>>> other SO question (
>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1)
>>> has a code snippet where the user copies the truststore from GCS to a local
>>> tmp file path and the configures the map with that temp file path.
>>>
>>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <[email protected]> wrote:
>>>
>>>> Can anyone teach me how Schema Registry client in Apache Beam is
>>>> configured?
>>>> I tried to find out it on the Github repo but I was not able to find
>>>> it. https://github.com/apache/beam
>>>>
>>>> I want to make sure if Apache Beam support Schema Registry and does not
>>>> have the same issue Confluent Schema Registry has.
>>>> https://github.com/confluentinc/schema-registry/issues/943
>>>>
>>>> Yohei Onishi
>>>>
>>>>
>>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and
>>>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS client
>>>>> certificate. And I am facing connection issue with Schema Registry on
>>>>> deployment. Any suggestion is highly welcomed.
>>>>>
>>>>> Here is what I do for the Dataflow job. I create Consumer Properties
>>>>> for TLS configurations.
>>>>>
>>>>> props.put("security.protocol", "SSL");
>>>>>> props.put("ssl.truststore.password", "aaa");
>>>>>> props.put("ssl.keystore.password", "bbb");
>>>>>> props.put("ssl.key.password", "ccc"));
>>>>>> props.put("schema.registry.url", "https://host:port";)
>>>>>> props.put("specific.avro.reader", true);
>>>>>
>>>>>
>>>>> And update Consumer Properties by updateConsumerProperties.
>>>>>
>>>>> Pipeline p = Pipeline.create(options)
>>>>>> ...
>>>>>> .updateConsumerProperties(properties)
>>>>>> ...
>>>>>
>>>>>
>>>>> As this stackoverflow answer suggests, I also download keyStore and
>>>>> trustStore to local directory and specify trustStore / keyStore location 
>>>>> on
>>>>> ConsumerProperties in ConsumerFactory.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1
>>>>>
>>>>> Pipeline p = Pipeline.create(options)
>>>>>>  ...
>>>>>>  .withConsumerFactoryFn(new MyConsumerFactory(...))
>>>>>>  ...
>>>>>
>>>>>
>>>>> In ConsumerFactory:
>>>>>
>>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
>>>>>>   // download keyStore and trustStore from GCS bucket
>>>>>>   config.put("ssl.truststore.location",
>>>>>> (Object)localTrustStoreFilePath)
>>>>>>   config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>>>>>>   new KafkaConsumer<byte[], byte[]>(config);
>>>>>> }
>>>>>
>>>>>
>>>>> With this code I succeeded in deployment but the Dataflow job got TLS
>>>>> server certificate verification error.
>>>>>
>>>>> Caused by: sun.security.validator.ValidatorException: PKIX path
>>>>>> building failed:
>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to 
>>>>>> find
>>>>>> valid certification path to requested target
>>>>>>
>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>>>>>
>>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
>>>>>>         sun.security.validator.Validator.validate(Validator.java:260)
>>>>>>
>>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
>>>>>>
>>>>>>  
>>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
>>>>>>
>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
>>>>>>
>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>>>>>>
>>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>>>>>>
>>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
>>>>>>
>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
>>>>>>
>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
>>>>>>
>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
>>>>>>
>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
>>>>>>
>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
>>>>>>
>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
>>>>>>
>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>> Then I found that Schema Registry client load TLS configurations from
>>>>> system property.
>>>>> https://github.com/confluentinc/schema-registry/issues/943
>>>>>
>>>>> I tested Kafka Consumer with the same configuration, and I confirmed
>>>>> it works fine.
>>>>>
>>>>> props.put("schema.registry.url", "https://host:port";)
>>>>>> props.put("specific.avro.reader", true);
>>>>>> props.put("ssl.truststore.location",
>>>>>> System.getProperty("javax.net.ssl.trustStore"));
>>>>>> props.put("ssl.truststore.password",
>>>>>> System.getProperty("javax.net.ssl.keyStore"));
>>>>>> props.put("ssl.keystore.location",
>>>>>> System.getProperty("javax.net.ssl.keyStore"));
>>>>>> props.put("ssl.keystore.password",
>>>>>> System.getProperty("javax.net.ssl.keyStorePassword"));
>>>>>> props.put("ssl.key.password",
>>>>>> System.getProperty("javax.net.ssl.key.password"));
>>>>>
>>>>>
>>>>> Next I applied the same approach, which means apply the same TLS
>>>>> configurations to system properties and Consumer Properties, to Dataflow
>>>>> job code.
>>>>>
>>>>> I specified password by system properties when executing application.
>>>>>
>>>>> -Djavax.net.ssl.keyStorePassword=aaa \
>>>>>> -Djavax.net.ssl.key.password=bbb \
>>>>>> -Djavax.net.ssl.trustStorePassword=ccc \
>>>>>
>>>>>
>>>>> Note: I set system property for trustStore and keyStore location in
>>>>> Consumer Factory since those files are downloaded to local temp directory.
>>>>>
>>>>> config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
>>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>>>>>> System.setProperty("javax.net.ssl.trustStore",
>>>>>> localTrustStoreFilePath)
>>>>>> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)
>>>>>
>>>>>
>>>>> but even deployment was failed with timeout error.
>>>>>
>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to
>>>>>> construct instance from factory method 
>>>>>> DataflowRunner#fromOptions(interface
>>>>>> org.apache.beam.sdk.options.PipelineOptions)
>>>>>>         at
>>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>>>>>> ...
>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> ...
>>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner
>>>>>> requires gcpTempLocation, but failed to retrieve a value from
>>>>>> PipelineOptions
>>>>>>         at
>>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
>>>>>> Caused by: java.lang.IllegalArgumentException: Error constructing
>>>>>> default value for gcpTempLocation: tempLocation is not a valid GCS path,
>>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp.
>>>>>>         at
>>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
>>>>>> ...
>>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS
>>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists.
>>>>>>         at
>>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
>>>>>> ...
>>>>>> Caused by: java.io.IOException: Error getting access token for
>>>>>> service account: java.security.NoSuchAlgorithmException: Error 
>>>>>> constructing
>>>>>> implementation (algorithm: Default, provider: SunJSSE, class:
>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>>>         at
>>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
>>>>>> ...
>>>>>> Caused by: java.net.SocketException:
>>>>>> java.security.NoSuchAlgorithmException: Error constructing implementation
>>>>>> (algorithm: Default, provider: SunJSSE, class:
>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>>>         at
>>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
>>>>>> ...
>>>>>> Caused by: java.security.NoSuchAlgorithmException: Error constructing
>>>>>> implementation (algorithm: Default, provider: SunJSSE, class:
>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>>>         at
>>>>>> java.security.Provider$Service.newInstance(Provider.java:1617)
>>>>>> ...
>>>>>
>>>>>
>>>>> Am I missing something?
>>>>>
>>>>> I posted the same question here
>>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi
>>>>>
>>>>> Thank you.
>>>>> Yohei Onishi
>>>>>
>>>> --
>> Yohei Onishi
>>
>

Reply via email to