Thanks,

I explicitly set file location and password in JAVA_TOOL_OPTIONS and
Consumer Factory.
Consumer Factory downloads keyStore and trusStore from GCS to local as
follows.
https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1

Then I got FileNotExecption like below. It seems Consumer Factory does not
work well. When I create temporary directory for those files
using Files.createTempDirectory(prefix), job deployment works but access to
Schema Registry.

Caused by: java.security.PrivilegedActionException:
java.io.FileNotFoundException:
/tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory)
        at java.security.AccessController.doPrivileged(Native Method)
        at
sun.security.ssl.SSLContextImpl$DefaultManagersHolder.getKeyManagers(SSLContextImpl.java:928)
        at
sun.security.ssl.SSLContextImpl$DefaultManagersHolder.<clinit>(SSLContextImpl.java:864)
        at
sun.security.ssl.SSLContextImpl$DefaultSSLContext.<init>(SSLContextImpl.java:1019)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.security.Provider$Service.newInstance(Provider.java:1595)
        ... 42 more
Caused by: java.io.FileNotFoundException:
/tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at java.io.FileInputStream.<init>(FileInputStream.java:93)
        at
sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:932)
        at
sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:929)

So I am also implementing a solution without Schema Registry.

Yohei Onishi


On Thu, May 9, 2019 at 5:12 PM Vishwas Bm <[email protected]> wrote:

> Hi Yohei,
>
> The configuration exposed as environment variable was picked up by the
> Beam  java program.
>
> If you check the backtrace of the error, it has method call to confluent
> schemaRegistry during deserialization of records
>
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
>
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
>
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
>
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
>
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
>
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
>
> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
>
> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
>
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Thu, May 9, 2019 at 12:32 PM Yohei Onishi <[email protected]> wrote:
>
>> Thanks. Did you use this configurations in your Apache Beam application?
>> Does Apache Beam use Confluent Schema Registry client internally ?
>>
>> Yohei Onishi
>>
>>
>> On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <[email protected]> wrote:
>>
>>> Hi Yohei,
>>>
>>> I had tried some time back with direct-runner and faced the same issue
>>> as in https://github.com/confluentinc/schema-registry/issues/943 when
>>> interacting with TLS enabled Kafka and SchemaRegistry.
>>>
>>> So I had set the environment variable JAVA_TOOL_OPTIONS with the
>>> required properties and then it worked.
>>>
>>> export
>>> JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks
>>> -Djavax.net.ssl.trustStorePassword="client-password"
>>> -Djavax.net.ssl.keyStore=client-keystore.jks
>>> -Djavax.net.ssl.keyStorePassword=client-password"
>>>
>>>
>>> *Thanks & Regards,*
>>>
>>> *Vishwas *
>>>
>>>
>>> On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <[email protected]> wrote:
>>>
>>>> Thanks but I already did it as mentioned.
>>>> It seems that Apache Beam does not support Schema Registry. I will post
>>>> about it after I confirmed.
>>>>
>>>> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> I replied to the SO question with more details.
>>>>>
>>>>> The issue is that your trying to load a truststore (file) on the VM
>>>>> which doesn't exist. You need to make that file accessible in some way. 
>>>>> The
>>>>> other SO question (
>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1)
>>>>> has a code snippet where the user copies the truststore from GCS to a 
>>>>> local
>>>>> tmp file path and the configures the map with that temp file path.
>>>>>
>>>>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Can anyone teach me how Schema Registry client in Apache Beam is
>>>>>> configured?
>>>>>> I tried to find out it on the Github repo but I was not able to find
>>>>>> it. https://github.com/apache/beam
>>>>>>
>>>>>> I want to make sure if Apache Beam support Schema Registry and does
>>>>>> not have the same issue Confluent Schema Registry has.
>>>>>> https://github.com/confluentinc/schema-registry/issues/943
>>>>>>
>>>>>> Yohei Onishi
>>>>>>
>>>>>>
>>>>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and
>>>>>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS 
>>>>>>> client
>>>>>>> certificate. And I am facing connection issue with Schema Registry on
>>>>>>> deployment. Any suggestion is highly welcomed.
>>>>>>>
>>>>>>> Here is what I do for the Dataflow job. I create Consumer Properties
>>>>>>> for TLS configurations.
>>>>>>>
>>>>>>> props.put("security.protocol", "SSL");
>>>>>>>> props.put("ssl.truststore.password", "aaa");
>>>>>>>> props.put("ssl.keystore.password", "bbb");
>>>>>>>> props.put("ssl.key.password", "ccc"));
>>>>>>>> props.put("schema.registry.url", "https://host:port";)
>>>>>>>> props.put("specific.avro.reader", true);
>>>>>>>
>>>>>>>
>>>>>>> And update Consumer Properties by updateConsumerProperties.
>>>>>>>
>>>>>>> Pipeline p = Pipeline.create(options)
>>>>>>>> ...
>>>>>>>> .updateConsumerProperties(properties)
>>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> As this stackoverflow answer suggests, I also download keyStore and
>>>>>>> trustStore to local directory and specify trustStore / keyStore 
>>>>>>> location on
>>>>>>> ConsumerProperties in ConsumerFactory.
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1
>>>>>>>
>>>>>>> Pipeline p = Pipeline.create(options)
>>>>>>>>  ...
>>>>>>>>  .withConsumerFactoryFn(new MyConsumerFactory(...))
>>>>>>>>  ...
>>>>>>>
>>>>>>>
>>>>>>> In ConsumerFactory:
>>>>>>>
>>>>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
>>>>>>>>   // download keyStore and trustStore from GCS bucket
>>>>>>>>   config.put("ssl.truststore.location",
>>>>>>>> (Object)localTrustStoreFilePath)
>>>>>>>>   config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>>>>>>>>   new KafkaConsumer<byte[], byte[]>(config);
>>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> With this code I succeeded in deployment but the Dataflow job got
>>>>>>> TLS server certificate verification error.
>>>>>>>
>>>>>>> Caused by: sun.security.validator.ValidatorException: PKIX path
>>>>>>>> building failed:
>>>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to 
>>>>>>>> find
>>>>>>>> valid certification path to requested target
>>>>>>>>
>>>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>>>>>>>
>>>>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
>>>>>>>>
>>>>>>>> sun.security.validator.Validator.validate(Validator.java:260)
>>>>>>>>
>>>>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
>>>>>>>>
>>>>>>>>  
>>>>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
>>>>>>>>
>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
>>>>>>>>
>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>>>>>>>>
>>>>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>>>>>>>>
>>>>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
>>>>>>>>
>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
>>>>>>>>
>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
>>>>>>>>
>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
>>>>>>>>
>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
>>>>>>>>
>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>>>>>>>>
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>         java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>>
>>>>>>> Then I found that Schema Registry client load TLS configurations
>>>>>>> from system property.
>>>>>>> https://github.com/confluentinc/schema-registry/issues/943
>>>>>>>
>>>>>>> I tested Kafka Consumer with the same configuration, and I confirmed
>>>>>>> it works fine.
>>>>>>>
>>>>>>> props.put("schema.registry.url", "https://host:port";)
>>>>>>>> props.put("specific.avro.reader", true);
>>>>>>>> props.put("ssl.truststore.location",
>>>>>>>> System.getProperty("javax.net.ssl.trustStore"));
>>>>>>>> props.put("ssl.truststore.password",
>>>>>>>> System.getProperty("javax.net.ssl.keyStore"));
>>>>>>>> props.put("ssl.keystore.location",
>>>>>>>> System.getProperty("javax.net.ssl.keyStore"));
>>>>>>>> props.put("ssl.keystore.password",
>>>>>>>> System.getProperty("javax.net.ssl.keyStorePassword"));
>>>>>>>> props.put("ssl.key.password",
>>>>>>>> System.getProperty("javax.net.ssl.key.password"));
>>>>>>>
>>>>>>>
>>>>>>> Next I applied the same approach, which means apply the same TLS
>>>>>>> configurations to system properties and Consumer Properties, to Dataflow
>>>>>>> job code.
>>>>>>>
>>>>>>> I specified password by system properties when executing application.
>>>>>>>
>>>>>>> -Djavax.net.ssl.keyStorePassword=aaa \
>>>>>>>> -Djavax.net.ssl.key.password=bbb \
>>>>>>>> -Djavax.net.ssl.trustStorePassword=ccc \
>>>>>>>
>>>>>>>
>>>>>>> Note: I set system property for trustStore and keyStore location in
>>>>>>> Consumer Factory since those files are downloaded to local temp 
>>>>>>> directory.
>>>>>>>
>>>>>>> config.put("ssl.truststore.location",
>>>>>>>> (Object)localTrustStoreFilePath)
>>>>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>>>>>>>> System.setProperty("javax.net.ssl.trustStore",
>>>>>>>> localTrustStoreFilePath)
>>>>>>>> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)
>>>>>>>
>>>>>>>
>>>>>>> but even deployment was failed with timeout error.
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to
>>>>>>>> construct instance from factory method 
>>>>>>>> DataflowRunner#fromOptions(interface
>>>>>>>> org.apache.beam.sdk.options.PipelineOptions)
>>>>>>>>         at
>>>>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>>>>>>>> ...
>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>> Method)
>>>>>>>> ...
>>>>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner
>>>>>>>> requires gcpTempLocation, but failed to retrieve a value from
>>>>>>>> PipelineOptions
>>>>>>>>         at
>>>>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Error constructing
>>>>>>>> default value for gcpTempLocation: tempLocation is not a valid GCS 
>>>>>>>> path,
>>>>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp.
>>>>>>>>         at
>>>>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
>>>>>>>> ...
>>>>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS
>>>>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists.
>>>>>>>>         at
>>>>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
>>>>>>>> ...
>>>>>>>> Caused by: java.io.IOException: Error getting access token for
>>>>>>>> service account: java.security.NoSuchAlgorithmException: Error 
>>>>>>>> constructing
>>>>>>>> implementation (algorithm: Default, provider: SunJSSE, class:
>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>>>>>         at
>>>>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
>>>>>>>> ...
>>>>>>>> Caused by: java.net.SocketException:
>>>>>>>> java.security.NoSuchAlgorithmException: Error constructing 
>>>>>>>> implementation
>>>>>>>> (algorithm: Default, provider: SunJSSE, class:
>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>>>>>         at
>>>>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
>>>>>>>> ...
>>>>>>>> Caused by: java.security.NoSuchAlgorithmException: Error
>>>>>>>> constructing implementation (algorithm: Default, provider: SunJSSE, 
>>>>>>>> class:
>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>>>>>         at
>>>>>>>> java.security.Provider$Service.newInstance(Provider.java:1617)
>>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> Am I missing something?
>>>>>>>
>>>>>>> I posted the same question here
>>>>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi
>>>>>>>
>>>>>>> Thank you.
>>>>>>> Yohei Onishi
>>>>>>>
>>>>>> --
>>>> Yohei Onishi
>>>>
>>>

Reply via email to