Can you access schema registry from a DoFn? It seemed to me that the issue is about setting up the JVM environment in such a way where it works with schema registry. If you can't access it from KafkaIO then I don't see how a different DoFn would be able to access it from within the same JVM.
*From: *Yohei Onishi <[email protected]> *Date: *Mon, May 13, 2019 at 2:41 AM *To: * <[email protected]> I got this answer from GCP support. Lukasz and Vishwas, any comments. > > Hello, > > the Dataflow specialist has reached me back. I will now expose what they >> have told me. The answer to your question is no, Apache Beam does not >> support Schema Registry. However, they have told me that you could >> implement the calls to Schema Registry by yourself as Beam only consumes >> raw messages and it is user's responsibility to do whatever they need with >> the data. This is based on our understanding of the case that you want to >> publish messages to Kafka, and have DF consume those messages, parsing them >> based on the schema from the registry. I hope this information can be >> useful to you, let me know if I can be of further help. >> > > Yohei Onishi > > > On Thu, May 9, 2019 at 8:34 PM Yohei Onishi <[email protected]> wrote: > >> Thanks, >> >> I explicitly set file location and password in JAVA_TOOL_OPTIONS and >> Consumer Factory. >> Consumer Factory downloads keyStore and trusStore from GCS to local as >> follows. >> >> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >> >> Then I got FileNotExecption like below. It seems Consumer Factory does >> not work well. When I create temporary directory for those files >> using Files.createTempDirectory(prefix), job deployment works but access to >> Schema Registry. >> >> Caused by: java.security.PrivilegedActionException: >> java.io.FileNotFoundException: >> /tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory) >> at java.security.AccessController.doPrivileged(Native Method) >> at >> sun.security.ssl.SSLContextImpl$DefaultManagersHolder.getKeyManagers(SSLContextImpl.java:928) >> at >> sun.security.ssl.SSLContextImpl$DefaultManagersHolder.<clinit>(SSLContextImpl.java:864) >> at >> sun.security.ssl.SSLContextImpl$DefaultSSLContext.<init>(SSLContextImpl.java:1019) >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at java.security.Provider$Service.newInstance(Provider.java:1595) >> ... 42 more >> Caused by: java.io.FileNotFoundException: >> /tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory) >> at java.io.FileInputStream.open0(Native Method) >> at java.io.FileInputStream.open(FileInputStream.java:195) >> at java.io.FileInputStream.<init>(FileInputStream.java:138) >> at java.io.FileInputStream.<init>(FileInputStream.java:93) >> at >> sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:932) >> at >> sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:929) >> >> So I am also implementing a solution without Schema Registry. >> >> Yohei Onishi >> >> >> On Thu, May 9, 2019 at 5:12 PM Vishwas Bm <[email protected]> wrote: >> >>> Hi Yohei, >>> >>> The configuration exposed as environment variable was picked up by the >>> Beam java program. >>> >>> If you check the backtrace of the error, it has method call to confluent >>> schemaRegistry during deserialization of records >>> >>> >>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>> >>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>> >>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>> >>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>> >>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>> >>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>> >>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>> >>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>> >>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>> >>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>> >>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>> >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>> >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>> >>> >>> >>> *Thanks & Regards,* >>> >>> *Vishwas * >>> >>> >>> On Thu, May 9, 2019 at 12:32 PM Yohei Onishi <[email protected]> wrote: >>> >>>> Thanks. Did you use this configurations in your Apache Beam application? >>>> Does Apache Beam use Confluent Schema Registry client internally ? >>>> >>>> Yohei Onishi >>>> >>>> >>>> On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <[email protected]> wrote: >>>> >>>>> Hi Yohei, >>>>> >>>>> I had tried some time back with direct-runner and faced the same issue >>>>> as in https://github.com/confluentinc/schema-registry/issues/943 when >>>>> interacting with TLS enabled Kafka and SchemaRegistry. >>>>> >>>>> So I had set the environment variable JAVA_TOOL_OPTIONS with the >>>>> required properties and then it worked. >>>>> >>>>> export >>>>> JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks >>>>> -Djavax.net.ssl.trustStorePassword="client-password" >>>>> -Djavax.net.ssl.keyStore=client-keystore.jks >>>>> -Djavax.net.ssl.keyStorePassword=client-password" >>>>> >>>>> >>>>> *Thanks & Regards,* >>>>> >>>>> *Vishwas * >>>>> >>>>> >>>>> On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks but I already did it as mentioned. >>>>>> It seems that Apache Beam does not support Schema Registry. I will >>>>>> post about it after I confirmed. >>>>>> >>>>>> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> I replied to the SO question with more details. >>>>>>> >>>>>>> The issue is that your trying to load a truststore (file) on the VM >>>>>>> which doesn't exist. You need to make that file accessible in some way. >>>>>>> The >>>>>>> other SO question ( >>>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1) >>>>>>> has a code snippet where the user copies the truststore from GCS to a >>>>>>> local >>>>>>> tmp file path and the configures the map with that temp file path. >>>>>>> >>>>>>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Can anyone teach me how Schema Registry client in Apache Beam is >>>>>>>> configured? >>>>>>>> I tried to find out it on the Github repo but I was not able to >>>>>>>> find it. https://github.com/apache/beam >>>>>>>> >>>>>>>> I want to make sure if Apache Beam support Schema Registry and does >>>>>>>> not have the same issue Confluent Schema Registry has. >>>>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>>>> >>>>>>>> Yohei Onishi >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and >>>>>>>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS >>>>>>>>> client >>>>>>>>> certificate. And I am facing connection issue with Schema Registry on >>>>>>>>> deployment. Any suggestion is highly welcomed. >>>>>>>>> >>>>>>>>> Here is what I do for the Dataflow job. I create Consumer >>>>>>>>> Properties for TLS configurations. >>>>>>>>> >>>>>>>>> props.put("security.protocol", "SSL"); >>>>>>>>>> props.put("ssl.truststore.password", "aaa"); >>>>>>>>>> props.put("ssl.keystore.password", "bbb"); >>>>>>>>>> props.put("ssl.key.password", "ccc")); >>>>>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>>>>> props.put("specific.avro.reader", true); >>>>>>>>> >>>>>>>>> >>>>>>>>> And update Consumer Properties by updateConsumerProperties. >>>>>>>>> >>>>>>>>> Pipeline p = Pipeline.create(options) >>>>>>>>>> ... >>>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>>> ... >>>>>>>>> >>>>>>>>> >>>>>>>>> As this stackoverflow answer suggests, I also download keyStore >>>>>>>>> and trustStore to local directory and specify trustStore / keyStore >>>>>>>>> location on ConsumerProperties in ConsumerFactory. >>>>>>>>> >>>>>>>>> >>>>>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >>>>>>>>> >>>>>>>>> Pipeline p = Pipeline.create(options) >>>>>>>>>> ... >>>>>>>>>> .withConsumerFactoryFn(new MyConsumerFactory(...)) >>>>>>>>>> ... >>>>>>>>> >>>>>>>>> >>>>>>>>> In ConsumerFactory: >>>>>>>>> >>>>>>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config) >>>>>>>>>> { >>>>>>>>>> // download keyStore and trustStore from GCS bucket >>>>>>>>>> config.put("ssl.truststore.location", >>>>>>>>>> (Object)localTrustStoreFilePath) >>>>>>>>>> config.put("ssl.keystore.location", >>>>>>>>>> (Object)localKeyStoreFilePath) >>>>>>>>>> new KafkaConsumer<byte[], byte[]>(config); >>>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> With this code I succeeded in deployment but the Dataflow job got >>>>>>>>> TLS server certificate verification error. >>>>>>>>> >>>>>>>>> Caused by: sun.security.validator.ValidatorException: PKIX path >>>>>>>>>> building failed: >>>>>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable >>>>>>>>>> to find >>>>>>>>>> valid certification path to requested target >>>>>>>>>> >>>>>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >>>>>>>>>> >>>>>>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) >>>>>>>>>> >>>>>>>>>> sun.security.validator.Validator.validate(Validator.java:260) >>>>>>>>>> >>>>>>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) >>>>>>>>>> >>>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) >>>>>>>>>> >>>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) >>>>>>>>>> >>>>>>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) >>>>>>>>>> >>>>>>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>>>>>>>>> >>>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>>>>>>>>> >>>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>>>>>>>>> >>>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>>>>>>>>> >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>>>>>>>>> >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>>>> java.lang.Thread.run(Thread.java:745) >>>>>>>>> >>>>>>>>> >>>>>>>>> Then I found that Schema Registry client load TLS configurations >>>>>>>>> from system property. >>>>>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>>>>> >>>>>>>>> I tested Kafka Consumer with the same configuration, and I >>>>>>>>> confirmed it works fine. >>>>>>>>> >>>>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>>>>> props.put("specific.avro.reader", true); >>>>>>>>>> props.put("ssl.truststore.location", >>>>>>>>>> System.getProperty("javax.net.ssl.trustStore")); >>>>>>>>>> props.put("ssl.truststore.password", >>>>>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>>>>> props.put("ssl.keystore.location", >>>>>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>>>>> props.put("ssl.keystore.password", >>>>>>>>>> System.getProperty("javax.net.ssl.keyStorePassword")); >>>>>>>>>> props.put("ssl.key.password", >>>>>>>>>> System.getProperty("javax.net.ssl.key.password")); >>>>>>>>> >>>>>>>>> >>>>>>>>> Next I applied the same approach, which means apply the same TLS >>>>>>>>> configurations to system properties and Consumer Properties, to >>>>>>>>> Dataflow >>>>>>>>> job code. >>>>>>>>> >>>>>>>>> I specified password by system properties when executing >>>>>>>>> application. >>>>>>>>> >>>>>>>>> -Djavax.net.ssl.keyStorePassword=aaa \ >>>>>>>>>> -Djavax.net.ssl.key.password=bbb \ >>>>>>>>>> -Djavax.net.ssl.trustStorePassword=ccc \ >>>>>>>>> >>>>>>>>> >>>>>>>>> Note: I set system property for trustStore and keyStore location >>>>>>>>> in Consumer Factory since those files are downloaded to local temp >>>>>>>>> directory. >>>>>>>>> >>>>>>>>> config.put("ssl.truststore.location", >>>>>>>>>> (Object)localTrustStoreFilePath) >>>>>>>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath) >>>>>>>>>> System.setProperty("javax.net.ssl.trustStore", >>>>>>>>>> localTrustStoreFilePath) >>>>>>>>>> System.setProperty("javax.net.ssl.keyStore", >>>>>>>>>> localKeyStoreFilePath) >>>>>>>>> >>>>>>>>> >>>>>>>>> but even deployment was failed with timeout error. >>>>>>>>> >>>>>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to >>>>>>>>>> construct instance from factory method >>>>>>>>>> DataflowRunner#fromOptions(interface >>>>>>>>>> org.apache.beam.sdk.options.PipelineOptions) >>>>>>>>>> at >>>>>>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224) >>>>>>>>>> ... >>>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>>>> Method) >>>>>>>>>> ... >>>>>>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner >>>>>>>>>> requires gcpTempLocation, but failed to retrieve a value from >>>>>>>>>> PipelineOptions >>>>>>>>>> at >>>>>>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246) >>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Error constructing >>>>>>>>>> default value for gcpTempLocation: tempLocation is not a valid GCS >>>>>>>>>> path, >>>>>>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. >>>>>>>>>> at >>>>>>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255) >>>>>>>>>> ... >>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS >>>>>>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists. >>>>>>>>>> at >>>>>>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86) >>>>>>>>>> ... >>>>>>>>>> Caused by: java.io.IOException: Error getting access token for >>>>>>>>>> service account: java.security.NoSuchAlgorithmException: Error >>>>>>>>>> constructing >>>>>>>>>> implementation (algorithm: Default, provider: SunJSSE, class: >>>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>>> at >>>>>>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401) >>>>>>>>>> ... >>>>>>>>>> Caused by: java.net.SocketException: >>>>>>>>>> java.security.NoSuchAlgorithmException: Error constructing >>>>>>>>>> implementation >>>>>>>>>> (algorithm: Default, provider: SunJSSE, class: >>>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>>> at >>>>>>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248) >>>>>>>>>> ... >>>>>>>>>> Caused by: java.security.NoSuchAlgorithmException: Error >>>>>>>>>> constructing implementation (algorithm: Default, provider: SunJSSE, >>>>>>>>>> class: >>>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>>> at >>>>>>>>>> java.security.Provider$Service.newInstance(Provider.java:1617) >>>>>>>>>> ... >>>>>>>>> >>>>>>>>> >>>>>>>>> Am I missing something? >>>>>>>>> >>>>>>>>> I posted the same question here >>>>>>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi >>>>>>>>> >>>>>>>>> Thank you. >>>>>>>>> Yohei Onishi >>>>>>>>> >>>>>>>> -- >>>>>> Yohei Onishi >>>>>> >>>>>
