Hi Yohei, The configuration exposed as environment variable was picked up by the Beam java program.
If you check the backtrace of the error, it has method call to confluent schemaRegistry during deserialization of records io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) *Thanks & Regards,* *Vishwas * On Thu, May 9, 2019 at 12:32 PM Yohei Onishi <[email protected]> wrote: > Thanks. Did you use this configurations in your Apache Beam application? > Does Apache Beam use Confluent Schema Registry client internally ? > > Yohei Onishi > > > On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <[email protected]> wrote: > >> Hi Yohei, >> >> I had tried some time back with direct-runner and faced the same issue as >> in https://github.com/confluentinc/schema-registry/issues/943 when >> interacting with TLS enabled Kafka and SchemaRegistry. >> >> So I had set the environment variable JAVA_TOOL_OPTIONS with the required >> properties and then it worked. >> >> export >> JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks >> -Djavax.net.ssl.trustStorePassword="client-password" >> -Djavax.net.ssl.keyStore=client-keystore.jks >> -Djavax.net.ssl.keyStorePassword=client-password" >> >> >> *Thanks & Regards,* >> >> *Vishwas * >> >> >> On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <[email protected]> wrote: >> >>> Thanks but I already did it as mentioned. >>> It seems that Apache Beam does not support Schema Registry. I will post >>> about it after I confirmed. >>> >>> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <[email protected]> wrote: >>> >>>> I replied to the SO question with more details. >>>> >>>> The issue is that your trying to load a truststore (file) on the VM >>>> which doesn't exist. You need to make that file accessible in some way. The >>>> other SO question ( >>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1) >>>> has a code snippet where the user copies the truststore from GCS to a local >>>> tmp file path and the configures the map with that temp file path. >>>> >>>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <[email protected]> wrote: >>>> >>>>> Can anyone teach me how Schema Registry client in Apache Beam is >>>>> configured? >>>>> I tried to find out it on the Github repo but I was not able to find >>>>> it. https://github.com/apache/beam >>>>> >>>>> I want to make sure if Apache Beam support Schema Registry and does >>>>> not have the same issue Confluent Schema Registry has. >>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>> >>>>> Yohei Onishi >>>>> >>>>> >>>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and >>>>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS client >>>>>> certificate. And I am facing connection issue with Schema Registry on >>>>>> deployment. Any suggestion is highly welcomed. >>>>>> >>>>>> Here is what I do for the Dataflow job. I create Consumer Properties >>>>>> for TLS configurations. >>>>>> >>>>>> props.put("security.protocol", "SSL"); >>>>>>> props.put("ssl.truststore.password", "aaa"); >>>>>>> props.put("ssl.keystore.password", "bbb"); >>>>>>> props.put("ssl.key.password", "ccc")); >>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>> props.put("specific.avro.reader", true); >>>>>> >>>>>> >>>>>> And update Consumer Properties by updateConsumerProperties. >>>>>> >>>>>> Pipeline p = Pipeline.create(options) >>>>>>> ... >>>>>>> .updateConsumerProperties(properties) >>>>>>> ... >>>>>> >>>>>> >>>>>> As this stackoverflow answer suggests, I also download keyStore and >>>>>> trustStore to local directory and specify trustStore / keyStore location >>>>>> on >>>>>> ConsumerProperties in ConsumerFactory. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >>>>>> >>>>>> Pipeline p = Pipeline.create(options) >>>>>>> ... >>>>>>> .withConsumerFactoryFn(new MyConsumerFactory(...)) >>>>>>> ... >>>>>> >>>>>> >>>>>> In ConsumerFactory: >>>>>> >>>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config) { >>>>>>> // download keyStore and trustStore from GCS bucket >>>>>>> config.put("ssl.truststore.location", >>>>>>> (Object)localTrustStoreFilePath) >>>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >>>>>>> new KafkaConsumer<byte[], byte[]>(config); >>>>>>> } >>>>>> >>>>>> >>>>>> With this code I succeeded in deployment but the Dataflow job got TLS >>>>>> server certificate verification error. >>>>>> >>>>>> Caused by: sun.security.validator.ValidatorException: PKIX path >>>>>>> building failed: >>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to >>>>>>> find >>>>>>> valid certification path to requested target >>>>>>> >>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >>>>>>> >>>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) >>>>>>> sun.security.validator.Validator.validate(Validator.java:260) >>>>>>> >>>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) >>>>>>> >>>>>>> >>>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) >>>>>>> >>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) >>>>>>> >>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) >>>>>>> >>>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) >>>>>>> >>>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>>>>>> >>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>>>>>> >>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>>>>>> >>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>>>>>> >>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>>>>>> >>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>>>>>> >>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>>>>>> >>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) >>>>>>> >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> >>>>>> Then I found that Schema Registry client load TLS configurations from >>>>>> system property. >>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>> >>>>>> I tested Kafka Consumer with the same configuration, and I confirmed >>>>>> it works fine. >>>>>> >>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>> props.put("specific.avro.reader", true); >>>>>>> props.put("ssl.truststore.location", >>>>>>> System.getProperty("javax.net.ssl.trustStore")); >>>>>>> props.put("ssl.truststore.password", >>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>> props.put("ssl.keystore.location", >>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>> props.put("ssl.keystore.password", >>>>>>> System.getProperty("javax.net.ssl.keyStorePassword")); >>>>>>> props.put("ssl.key.password", >>>>>>> System.getProperty("javax.net.ssl.key.password")); >>>>>> >>>>>> >>>>>> Next I applied the same approach, which means apply the same TLS >>>>>> configurations to system properties and Consumer Properties, to Dataflow >>>>>> job code. >>>>>> >>>>>> I specified password by system properties when executing application. >>>>>> >>>>>> -Djavax.net.ssl.keyStorePassword=aaa \ >>>>>>> -Djavax.net.ssl.key.password=bbb \ >>>>>>> -Djavax.net.ssl.trustStorePassword=ccc \ >>>>>> >>>>>> >>>>>> Note: I set system property for trustStore and keyStore location in >>>>>> Consumer Factory since those files are downloaded to local temp >>>>>> directory. >>>>>> >>>>>> config.put("ssl.truststore.location", (Object)localTrustStoreFilePath) >>>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >>>>>>> System.setProperty("javax.net.ssl.trustStore", >>>>>>> localTrustStoreFilePath) >>>>>>> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath) >>>>>> >>>>>> >>>>>> but even deployment was failed with timeout error. >>>>>> >>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to >>>>>>> construct instance from factory method >>>>>>> DataflowRunner#fromOptions(interface >>>>>>> org.apache.beam.sdk.options.PipelineOptions) >>>>>>> at >>>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224) >>>>>>> ... >>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>> Method) >>>>>>> ... >>>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner >>>>>>> requires gcpTempLocation, but failed to retrieve a value from >>>>>>> PipelineOptions >>>>>>> at >>>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246) >>>>>>> Caused by: java.lang.IllegalArgumentException: Error constructing >>>>>>> default value for gcpTempLocation: tempLocation is not a valid GCS path, >>>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. >>>>>>> at >>>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255) >>>>>>> ... >>>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS >>>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists. >>>>>>> at >>>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86) >>>>>>> ... >>>>>>> Caused by: java.io.IOException: Error getting access token for >>>>>>> service account: java.security.NoSuchAlgorithmException: Error >>>>>>> constructing >>>>>>> implementation (algorithm: Default, provider: SunJSSE, class: >>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>> at >>>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401) >>>>>>> ... >>>>>>> Caused by: java.net.SocketException: >>>>>>> java.security.NoSuchAlgorithmException: Error constructing >>>>>>> implementation >>>>>>> (algorithm: Default, provider: SunJSSE, class: >>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>> at >>>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248) >>>>>>> ... >>>>>>> Caused by: java.security.NoSuchAlgorithmException: Error >>>>>>> constructing implementation (algorithm: Default, provider: SunJSSE, >>>>>>> class: >>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>> at >>>>>>> java.security.Provider$Service.newInstance(Provider.java:1617) >>>>>>> ... >>>>>> >>>>>> >>>>>> Am I missing something? >>>>>> >>>>>> I posted the same question here >>>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi >>>>>> >>>>>> Thank you. >>>>>> Yohei Onishi >>>>>> >>>>> -- >>> Yohei Onishi >>> >>
