Thanks. Did you use this configurations in your Apache Beam application? Does Apache Beam use Confluent Schema Registry client internally ?
Yohei Onishi On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <[email protected]> wrote: > Hi Yohei, > > I had tried some time back with direct-runner and faced the same issue as > in https://github.com/confluentinc/schema-registry/issues/943 when > interacting with TLS enabled Kafka and SchemaRegistry. > > So I had set the environment variable JAVA_TOOL_OPTIONS with the required > properties and then it worked. > > export > JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks > -Djavax.net.ssl.trustStorePassword="client-password" > -Djavax.net.ssl.keyStore=client-keystore.jks > -Djavax.net.ssl.keyStorePassword=client-password" > > > *Thanks & Regards,* > > *Vishwas * > > > On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <[email protected]> wrote: > >> Thanks but I already did it as mentioned. >> It seems that Apache Beam does not support Schema Registry. I will post >> about it after I confirmed. >> >> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <[email protected]> wrote: >> >>> I replied to the SO question with more details. >>> >>> The issue is that your trying to load a truststore (file) on the VM >>> which doesn't exist. You need to make that file accessible in some way. The >>> other SO question ( >>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1) >>> has a code snippet where the user copies the truststore from GCS to a local >>> tmp file path and the configures the map with that temp file path. >>> >>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <[email protected]> wrote: >>> >>>> Can anyone teach me how Schema Registry client in Apache Beam is >>>> configured? >>>> I tried to find out it on the Github repo but I was not able to find >>>> it. https://github.com/apache/beam >>>> >>>> I want to make sure if Apache Beam support Schema Registry and does not >>>> have the same issue Confluent Schema Registry has. >>>> https://github.com/confluentinc/schema-registry/issues/943 >>>> >>>> Yohei Onishi >>>> >>>> >>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and >>>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS client >>>>> certificate. And I am facing connection issue with Schema Registry on >>>>> deployment. Any suggestion is highly welcomed. >>>>> >>>>> Here is what I do for the Dataflow job. I create Consumer Properties >>>>> for TLS configurations. >>>>> >>>>> props.put("security.protocol", "SSL"); >>>>>> props.put("ssl.truststore.password", "aaa"); >>>>>> props.put("ssl.keystore.password", "bbb"); >>>>>> props.put("ssl.key.password", "ccc")); >>>>>> props.put("schema.registry.url", "https://host:port") >>>>>> props.put("specific.avro.reader", true); >>>>> >>>>> >>>>> And update Consumer Properties by updateConsumerProperties. >>>>> >>>>> Pipeline p = Pipeline.create(options) >>>>>> ... >>>>>> .updateConsumerProperties(properties) >>>>>> ... >>>>> >>>>> >>>>> As this stackoverflow answer suggests, I also download keyStore and >>>>> trustStore to local directory and specify trustStore / keyStore location >>>>> on >>>>> ConsumerProperties in ConsumerFactory. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >>>>> >>>>> Pipeline p = Pipeline.create(options) >>>>>> ... >>>>>> .withConsumerFactoryFn(new MyConsumerFactory(...)) >>>>>> ... >>>>> >>>>> >>>>> In ConsumerFactory: >>>>> >>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config) { >>>>>> // download keyStore and trustStore from GCS bucket >>>>>> config.put("ssl.truststore.location", >>>>>> (Object)localTrustStoreFilePath) >>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >>>>>> new KafkaConsumer<byte[], byte[]>(config); >>>>>> } >>>>> >>>>> >>>>> With this code I succeeded in deployment but the Dataflow job got TLS >>>>> server certificate verification error. >>>>> >>>>> Caused by: sun.security.validator.ValidatorException: PKIX path >>>>>> building failed: >>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to >>>>>> find >>>>>> valid certification path to requested target >>>>>> >>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >>>>>> >>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) >>>>>> sun.security.validator.Validator.validate(Validator.java:260) >>>>>> >>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) >>>>>> >>>>>> >>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) >>>>>> >>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) >>>>>> >>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) >>>>>> >>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) >>>>>> >>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>>>>> >>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>>>>> >>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>>>>> >>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>>>>> >>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>>>>> >>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>>>>> >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>>>>> >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) >>>>>> >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> Then I found that Schema Registry client load TLS configurations from >>>>> system property. >>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>> >>>>> I tested Kafka Consumer with the same configuration, and I confirmed >>>>> it works fine. >>>>> >>>>> props.put("schema.registry.url", "https://host:port") >>>>>> props.put("specific.avro.reader", true); >>>>>> props.put("ssl.truststore.location", >>>>>> System.getProperty("javax.net.ssl.trustStore")); >>>>>> props.put("ssl.truststore.password", >>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>> props.put("ssl.keystore.location", >>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>> props.put("ssl.keystore.password", >>>>>> System.getProperty("javax.net.ssl.keyStorePassword")); >>>>>> props.put("ssl.key.password", >>>>>> System.getProperty("javax.net.ssl.key.password")); >>>>> >>>>> >>>>> Next I applied the same approach, which means apply the same TLS >>>>> configurations to system properties and Consumer Properties, to Dataflow >>>>> job code. >>>>> >>>>> I specified password by system properties when executing application. >>>>> >>>>> -Djavax.net.ssl.keyStorePassword=aaa \ >>>>>> -Djavax.net.ssl.key.password=bbb \ >>>>>> -Djavax.net.ssl.trustStorePassword=ccc \ >>>>> >>>>> >>>>> Note: I set system property for trustStore and keyStore location in >>>>> Consumer Factory since those files are downloaded to local temp directory. >>>>> >>>>> config.put("ssl.truststore.location", (Object)localTrustStoreFilePath) >>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >>>>>> System.setProperty("javax.net.ssl.trustStore", >>>>>> localTrustStoreFilePath) >>>>>> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath) >>>>> >>>>> >>>>> but even deployment was failed with timeout error. >>>>> >>>>> Exception in thread "main" java.lang.RuntimeException: Failed to >>>>>> construct instance from factory method >>>>>> DataflowRunner#fromOptions(interface >>>>>> org.apache.beam.sdk.options.PipelineOptions) >>>>>> at >>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224) >>>>>> ... >>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> ... >>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner >>>>>> requires gcpTempLocation, but failed to retrieve a value from >>>>>> PipelineOptions >>>>>> at >>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246) >>>>>> Caused by: java.lang.IllegalArgumentException: Error constructing >>>>>> default value for gcpTempLocation: tempLocation is not a valid GCS path, >>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. >>>>>> at >>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255) >>>>>> ... >>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS >>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists. >>>>>> at >>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86) >>>>>> ... >>>>>> Caused by: java.io.IOException: Error getting access token for >>>>>> service account: java.security.NoSuchAlgorithmException: Error >>>>>> constructing >>>>>> implementation (algorithm: Default, provider: SunJSSE, class: >>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>> at >>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401) >>>>>> ... >>>>>> Caused by: java.net.SocketException: >>>>>> java.security.NoSuchAlgorithmException: Error constructing implementation >>>>>> (algorithm: Default, provider: SunJSSE, class: >>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>> at >>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248) >>>>>> ... >>>>>> Caused by: java.security.NoSuchAlgorithmException: Error constructing >>>>>> implementation (algorithm: Default, provider: SunJSSE, class: >>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>> at >>>>>> java.security.Provider$Service.newInstance(Provider.java:1617) >>>>>> ... >>>>> >>>>> >>>>> Am I missing something? >>>>> >>>>> I posted the same question here >>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi >>>>> >>>>> Thank you. >>>>> Yohei Onishi >>>>> >>>> -- >> Yohei Onishi >> >
