Hi,

I want to update my question with few new findings.

GoogleCredentials instance is being passed to the pipeline as below.


|GoogleCredentials credential = GoogleCredentials .fromStream(credentialJsonInputStream) .createScoped("https://www.googleapis.com/auth/cloud-platform";, "https://www.googleapis.com/auth/pubsub";); credential.refreshIfExpired(); options.setGcpCredential(credential); options.setProject("gcp-project-id") |

The /options/ reference above extendsPubsubOptions <https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.html>.


The exception returned when running the application is:


Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden POSThttps://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish <https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish>{ "code" : 403, "errors" : [ { "domain" : "global", "message" : "The request is missing a valid API key.", "reason" : "forbidden" } ], "message" : "The request is missing a valid API key.", "status" : "PERMISSION_DENIED" } at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67) Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden POSThttps://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish <https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish>{ "code" : 403, "errors" : [ { "domain" : "global", "message" : "The request is missing a valid API key.", "reason" : "forbidden" } ], "message" : "The request is missing a valid API key.", "status" : "PERMISSION_DENIED" } at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) at org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.publish(PubsubJsonClient.java:141) at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.publishBatch(PubsubUnboundedSink.java:226) at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.processElement(PubsubUnboundedSink.java:266)



While debugging I noticed that the|PubsubOptions|reference passed to theorg.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient <https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java#L82>returns|null|when calling|GcpOptions#getGcpCredential|

This project includes following dependencies:

com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.2

com.amazonaws:aws-kinesisanalytics-runtime:1.2.0

org.apache.beam:beam-sdks-java-core:2.28.0

org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.28.0

org.apache.beam:beam-runners-flink-1.11:2.28.0

org.apache.beam:beam-sdks-java-io-amazon-web-services:2.28.0

org.apache.beam:beam-sdks-java-io-kinesis:2.28.0

com.amazonaws:aws-java-sdk-logs:1.11.903


com.amazonaws:aws-java-sdk-bom:1.11.903

com.google.cloud:libraries-bom:13.2.0


I highly appreciate any help!


On 14/08/2021 15:19, Gayan Weerakutti wrote:
Hi,


I'm trying to deploy an Apache Beam application in a managed Apache Flink Cluster (Kinesis Data Analytics <https://aws.amazon.com/kinesis/data-analytics/>). The pipeline uses thePubsubIO <https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html>connector. But I can't get the application to authenticate with Google Cloud, because Kinesis Data Analytics does not allow to export environment variables, so exportingGOOGLE_APPLICATION_CREDENTIALS <https://cloud.google.com/docs/authentication/getting-started>environment variable doesn't seem to be an option.

I tried to authenticate from code, as below.


|GoogleCredentials credential = GoogleCredentials.fromStream(credentialJsonInputStream); options.as(GcpOptions.class).setGcpCredential(credential); Pipeline pipeline = Pipeline.create(options)|

|​ |

But that didn't work. I'm not sure whether theGcpOptions <https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html>interface is only supposed to be used with Dataflow runner.


I'd really appreciate any insights on how to authenticate in this scenario.


Thanks and regards,
*Gayan Weerakutti*
<https://www.linkedin.com/in/reversiblean/>linkedin.com/in/gayanweerakutti <https://www.linkedin.com/in/gayanweerakutti/>

Reply via email to