I got this answer from GCP support. Lukasz and Vishwas, any comments. Hello,
the Dataflow specialist has reached me back. I will now expose what they > have told me. The answer to your question is no, Apache Beam does not > support Schema Registry. However, they have told me that you could > implement the calls to Schema Registry by yourself as Beam only consumes > raw messages and it is user's responsibility to do whatever they need with > the data. This is based on our understanding of the case that you want to > publish messages to Kafka, and have DF consume those messages, parsing them > based on the schema from the registry. I hope this information can be > useful to you, let me know if I can be of further help. > Yohei Onishi On Thu, May 9, 2019 at 8:34 PM Yohei Onishi <[email protected]> wrote: > Thanks, > > I explicitly set file location and password in JAVA_TOOL_OPTIONS and > Consumer Factory. > Consumer Factory downloads keyStore and trusStore from GCS to local as > follows. > > https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 > > Then I got FileNotExecption like below. It seems Consumer Factory does not > work well. When I create temporary directory for those files > using Files.createTempDirectory(prefix), job deployment works but access to > Schema Registry. > > Caused by: java.security.PrivilegedActionException: > java.io.FileNotFoundException: > /tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory) > at java.security.AccessController.doPrivileged(Native Method) > at > sun.security.ssl.SSLContextImpl$DefaultManagersHolder.getKeyManagers(SSLContextImpl.java:928) > at > sun.security.ssl.SSLContextImpl$DefaultManagersHolder.<clinit>(SSLContextImpl.java:864) > at > sun.security.ssl.SSLContextImpl$DefaultSSLContext.<init>(SSLContextImpl.java:1019) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at java.security.Provider$Service.newInstance(Provider.java:1595) > ... 42 more > Caused by: java.io.FileNotFoundException: > /tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at java.io.FileInputStream.<init>(FileInputStream.java:93) > at > sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:932) > at > sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:929) > > So I am also implementing a solution without Schema Registry. > > Yohei Onishi > > > On Thu, May 9, 2019 at 5:12 PM Vishwas Bm <[email protected]> wrote: > >> Hi Yohei, >> >> The configuration exposed as environment variable was picked up by the >> Beam java program. >> >> If you check the backtrace of the error, it has method call to confluent >> schemaRegistry during deserialization of records >> >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >> >> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >> >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >> >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >> >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >> >> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >> >> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >> >> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >> >> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >> >> >> >> *Thanks & Regards,* >> >> *Vishwas * >> >> >> On Thu, May 9, 2019 at 12:32 PM Yohei Onishi <[email protected]> wrote: >> >>> Thanks. Did you use this configurations in your Apache Beam application? >>> Does Apache Beam use Confluent Schema Registry client internally ? >>> >>> Yohei Onishi >>> >>> >>> On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <[email protected]> wrote: >>> >>>> Hi Yohei, >>>> >>>> I had tried some time back with direct-runner and faced the same issue >>>> as in https://github.com/confluentinc/schema-registry/issues/943 when >>>> interacting with TLS enabled Kafka and SchemaRegistry. >>>> >>>> So I had set the environment variable JAVA_TOOL_OPTIONS with the >>>> required properties and then it worked. >>>> >>>> export >>>> JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks >>>> -Djavax.net.ssl.trustStorePassword="client-password" >>>> -Djavax.net.ssl.keyStore=client-keystore.jks >>>> -Djavax.net.ssl.keyStorePassword=client-password" >>>> >>>> >>>> *Thanks & Regards,* >>>> >>>> *Vishwas * >>>> >>>> >>>> On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <[email protected]> wrote: >>>> >>>>> Thanks but I already did it as mentioned. >>>>> It seems that Apache Beam does not support Schema Registry. I will >>>>> post about it after I confirmed. >>>>> >>>>> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> I replied to the SO question with more details. >>>>>> >>>>>> The issue is that your trying to load a truststore (file) on the VM >>>>>> which doesn't exist. You need to make that file accessible in some way. >>>>>> The >>>>>> other SO question ( >>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1) >>>>>> has a code snippet where the user copies the truststore from GCS to a >>>>>> local >>>>>> tmp file path and the configures the map with that temp file path. >>>>>> >>>>>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Can anyone teach me how Schema Registry client in Apache Beam is >>>>>>> configured? >>>>>>> I tried to find out it on the Github repo but I was not able to find >>>>>>> it. https://github.com/apache/beam >>>>>>> >>>>>>> I want to make sure if Apache Beam support Schema Registry and does >>>>>>> not have the same issue Confluent Schema Registry has. >>>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>>> >>>>>>> Yohei Onishi >>>>>>> >>>>>>> >>>>>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and >>>>>>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS >>>>>>>> client >>>>>>>> certificate. And I am facing connection issue with Schema Registry on >>>>>>>> deployment. Any suggestion is highly welcomed. >>>>>>>> >>>>>>>> Here is what I do for the Dataflow job. I create Consumer >>>>>>>> Properties for TLS configurations. >>>>>>>> >>>>>>>> props.put("security.protocol", "SSL"); >>>>>>>>> props.put("ssl.truststore.password", "aaa"); >>>>>>>>> props.put("ssl.keystore.password", "bbb"); >>>>>>>>> props.put("ssl.key.password", "ccc")); >>>>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>>>> props.put("specific.avro.reader", true); >>>>>>>> >>>>>>>> >>>>>>>> And update Consumer Properties by updateConsumerProperties. >>>>>>>> >>>>>>>> Pipeline p = Pipeline.create(options) >>>>>>>>> ... >>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>> ... >>>>>>>> >>>>>>>> >>>>>>>> As this stackoverflow answer suggests, I also download keyStore and >>>>>>>> trustStore to local directory and specify trustStore / keyStore >>>>>>>> location on >>>>>>>> ConsumerProperties in ConsumerFactory. >>>>>>>> >>>>>>>> >>>>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >>>>>>>> >>>>>>>> Pipeline p = Pipeline.create(options) >>>>>>>>> ... >>>>>>>>> .withConsumerFactoryFn(new MyConsumerFactory(...)) >>>>>>>>> ... >>>>>>>> >>>>>>>> >>>>>>>> In ConsumerFactory: >>>>>>>> >>>>>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config) { >>>>>>>>> // download keyStore and trustStore from GCS bucket >>>>>>>>> config.put("ssl.truststore.location", >>>>>>>>> (Object)localTrustStoreFilePath) >>>>>>>>> config.put("ssl.keystore.location", >>>>>>>>> (Object)localKeyStoreFilePath) >>>>>>>>> new KafkaConsumer<byte[], byte[]>(config); >>>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> With this code I succeeded in deployment but the Dataflow job got >>>>>>>> TLS server certificate verification error. >>>>>>>> >>>>>>>> Caused by: sun.security.validator.ValidatorException: PKIX path >>>>>>>>> building failed: >>>>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to >>>>>>>>> find >>>>>>>>> valid certification path to requested target >>>>>>>>> >>>>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >>>>>>>>> >>>>>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) >>>>>>>>> >>>>>>>>> sun.security.validator.Validator.validate(Validator.java:260) >>>>>>>>> >>>>>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) >>>>>>>>> >>>>>>>>> >>>>>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) >>>>>>>>> >>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) >>>>>>>>> >>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) >>>>>>>>> >>>>>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) >>>>>>>>> >>>>>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>>>>>>>> >>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>>>>>>>> >>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>>>>>>>> >>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>>>>>>>> >>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>>>>>>>> >>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>>>>>>>> >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>>>>>>>> >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) >>>>>>>>> >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>>> >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>>> java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> >>>>>>>> Then I found that Schema Registry client load TLS configurations >>>>>>>> from system property. >>>>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>>>> >>>>>>>> I tested Kafka Consumer with the same configuration, and I >>>>>>>> confirmed it works fine. >>>>>>>> >>>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>>>> props.put("specific.avro.reader", true); >>>>>>>>> props.put("ssl.truststore.location", >>>>>>>>> System.getProperty("javax.net.ssl.trustStore")); >>>>>>>>> props.put("ssl.truststore.password", >>>>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>>>> props.put("ssl.keystore.location", >>>>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>>>> props.put("ssl.keystore.password", >>>>>>>>> System.getProperty("javax.net.ssl.keyStorePassword")); >>>>>>>>> props.put("ssl.key.password", >>>>>>>>> System.getProperty("javax.net.ssl.key.password")); >>>>>>>> >>>>>>>> >>>>>>>> Next I applied the same approach, which means apply the same TLS >>>>>>>> configurations to system properties and Consumer Properties, to >>>>>>>> Dataflow >>>>>>>> job code. >>>>>>>> >>>>>>>> I specified password by system properties when executing >>>>>>>> application. >>>>>>>> >>>>>>>> -Djavax.net.ssl.keyStorePassword=aaa \ >>>>>>>>> -Djavax.net.ssl.key.password=bbb \ >>>>>>>>> -Djavax.net.ssl.trustStorePassword=ccc \ >>>>>>>> >>>>>>>> >>>>>>>> Note: I set system property for trustStore and keyStore location in >>>>>>>> Consumer Factory since those files are downloaded to local temp >>>>>>>> directory. >>>>>>>> >>>>>>>> config.put("ssl.truststore.location", >>>>>>>>> (Object)localTrustStoreFilePath) >>>>>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >>>>>>>>> System.setProperty("javax.net.ssl.trustStore", >>>>>>>>> localTrustStoreFilePath) >>>>>>>>> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath) >>>>>>>> >>>>>>>> >>>>>>>> but even deployment was failed with timeout error. >>>>>>>> >>>>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to >>>>>>>>> construct instance from factory method >>>>>>>>> DataflowRunner#fromOptions(interface >>>>>>>>> org.apache.beam.sdk.options.PipelineOptions) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224) >>>>>>>>> ... >>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>>> Method) >>>>>>>>> ... >>>>>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner >>>>>>>>> requires gcpTempLocation, but failed to retrieve a value from >>>>>>>>> PipelineOptions >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246) >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Error constructing >>>>>>>>> default value for gcpTempLocation: tempLocation is not a valid GCS >>>>>>>>> path, >>>>>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255) >>>>>>>>> ... >>>>>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS >>>>>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists. >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86) >>>>>>>>> ... >>>>>>>>> Caused by: java.io.IOException: Error getting access token for >>>>>>>>> service account: java.security.NoSuchAlgorithmException: Error >>>>>>>>> constructing >>>>>>>>> implementation (algorithm: Default, provider: SunJSSE, class: >>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>> at >>>>>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401) >>>>>>>>> ... >>>>>>>>> Caused by: java.net.SocketException: >>>>>>>>> java.security.NoSuchAlgorithmException: Error constructing >>>>>>>>> implementation >>>>>>>>> (algorithm: Default, provider: SunJSSE, class: >>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>> at >>>>>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248) >>>>>>>>> ... >>>>>>>>> Caused by: java.security.NoSuchAlgorithmException: Error >>>>>>>>> constructing implementation (algorithm: Default, provider: SunJSSE, >>>>>>>>> class: >>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>> at >>>>>>>>> java.security.Provider$Service.newInstance(Provider.java:1617) >>>>>>>>> ... >>>>>>>> >>>>>>>> >>>>>>>> Am I missing something? >>>>>>>> >>>>>>>> I posted the same question here >>>>>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi >>>>>>>> >>>>>>>> Thank you. >>>>>>>> Yohei Onishi >>>>>>>> >>>>>>> -- >>>>> Yohei Onishi >>>>> >>>>
