Can anyone teach me how Schema Registry client in Apache Beam is configured? I tried to find out it on the Github repo but I was not able to find it. https://github.com/apache/beam
I want to make sure if Apache Beam support Schema Registry and does not have the same issue Confluent Schema Registry has. https://github.com/confluentinc/schema-registry/issues/943 Yohei Onishi On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]> wrote: > Hi, > > I am developing a GCP Cloud Dataflow job that use Kafka broker and Schema > Registry. Our Kafka broker and Schema Registry requires TLS client > certificate. And I am facing connection issue with Schema Registry on > deployment. Any suggestion is highly welcomed. > > Here is what I do for the Dataflow job. I create Consumer Properties for > TLS configurations. > > props.put("security.protocol", "SSL"); >> props.put("ssl.truststore.password", "aaa"); >> props.put("ssl.keystore.password", "bbb"); >> props.put("ssl.key.password", "ccc")); >> props.put("schema.registry.url", "https://host:port") >> props.put("specific.avro.reader", true); > > > And update Consumer Properties by updateConsumerProperties. > > Pipeline p = Pipeline.create(options) >> ... >> .updateConsumerProperties(properties) >> ... > > > As this stackoverflow answer suggests, I also download keyStore and > trustStore to local directory and specify trustStore / keyStore location on > ConsumerProperties in ConsumerFactory. > > > https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 > > Pipeline p = Pipeline.create(options) >> ... >> .withConsumerFactoryFn(new MyConsumerFactory(...)) >> ... > > > In ConsumerFactory: > > public Consumer<byte[], byte[]> apply(Map<String, Object> config) { >> // download keyStore and trustStore from GCS bucket >> config.put("ssl.truststore.location", (Object)localTrustStoreFilePath) >> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >> new KafkaConsumer<byte[], byte[]>(config); >> } > > > With this code I succeeded in deployment but the Dataflow job got TLS > server certificate verification error. > > Caused by: sun.security.validator.ValidatorException: PKIX path building >> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable >> to find valid certification path to requested target >> >> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >> >> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) >> sun.security.validator.Validator.validate(Validator.java:260) >> >> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) >> >> >> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) >> >> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) >> >> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) >> >> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) >> >> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >> >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >> >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >> >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >> >> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >> >> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >> >> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >> >> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >> >> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) > > > Then I found that Schema Registry client load TLS configurations from > system property. > https://github.com/confluentinc/schema-registry/issues/943 > > I tested Kafka Consumer with the same configuration, and I confirmed it > works fine. > > props.put("schema.registry.url", "https://host:port") >> props.put("specific.avro.reader", true); >> props.put("ssl.truststore.location", >> System.getProperty("javax.net.ssl.trustStore")); >> props.put("ssl.truststore.password", >> System.getProperty("javax.net.ssl.keyStore")); >> props.put("ssl.keystore.location", >> System.getProperty("javax.net.ssl.keyStore")); >> props.put("ssl.keystore.password", >> System.getProperty("javax.net.ssl.keyStorePassword")); >> props.put("ssl.key.password", >> System.getProperty("javax.net.ssl.key.password")); > > > Next I applied the same approach, which means apply the same TLS > configurations to system properties and Consumer Properties, to Dataflow > job code. > > I specified password by system properties when executing application. > > -Djavax.net.ssl.keyStorePassword=aaa \ >> -Djavax.net.ssl.key.password=bbb \ >> -Djavax.net.ssl.trustStorePassword=ccc \ > > > Note: I set system property for trustStore and keyStore location in > Consumer Factory since those files are downloaded to local temp directory. > > config.put("ssl.truststore.location", (Object)localTrustStoreFilePath) >> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >> System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath) >> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath) > > > but even deployment was failed with timeout error. > > Exception in thread "main" java.lang.RuntimeException: Failed to construct >> instance from factory method DataflowRunner#fromOptions(interface >> org.apache.beam.sdk.options.PipelineOptions) >> at >> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224) >> ... >> Caused by: java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ... >> Caused by: java.lang.IllegalArgumentException: DataflowRunner requires >> gcpTempLocation, but failed to retrieve a value from PipelineOptions >> at >> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246) >> Caused by: java.lang.IllegalArgumentException: Error constructing default >> value for gcpTempLocation: tempLocation is not a valid GCS path, >> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. >> at >> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255) >> ... >> Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket >> gs://dev-k8s-rfid-store-dataflow exists. >> at >> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86) >> ... >> Caused by: java.io.IOException: Error getting access token for service >> account: java.security.NoSuchAlgorithmException: Error constructing >> implementation (algorithm: Default, provider: SunJSSE, class: >> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >> at >> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401) >> ... >> Caused by: java.net.SocketException: >> java.security.NoSuchAlgorithmException: Error constructing implementation >> (algorithm: Default, provider: SunJSSE, class: >> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >> at >> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248) >> ... >> Caused by: java.security.NoSuchAlgorithmException: Error constructing >> implementation (algorithm: Default, provider: SunJSSE, class: >> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >> at java.security.Provider$Service.newInstance(Provider.java:1617) >> ... > > > Am I missing something? > > I posted the same question here > https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi > > Thank you. > Yohei Onishi >
