Thanks Austin, Luke replying my message:
I did some experiments, these are my code snippets.
Manen:
<beam.version>2.21.0</beam.version>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner-jdbc</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>1.56.0</version>
</dependency>
Java code:
public class SpannerJdbcToCsvText {
private static final Logger LOG =
LoggerFactory.getLogger(SpannerJdbcToCsvText.class);
public interface SpannerToTextOptions
extends PipelineOptions,
SpannerReadOptions,
JavascriptTextTransformerOptions,
FilesystemWriteOptions {
}
public static void main(String[] args) {
LOG.info("Starting pipeline setup");
PipelineOptionsFactory.register(SpannerToTextOptions.class);
SpannerToTextOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);
FileSystems.setDefaultPipelineOptions(options);
Pipeline pipeline = Pipeline.create(options);
// ----- block 1 start---------------------
// block 1 will print out in logs in my local mac desktop.
List<String> list = new ArrayList<>();
try {
String projectId = "projectId";
String instanceId = "instanceId";
String databaseId = "databaseId";
String credentialsFile = "/my/mac/local/path/credentials.json";
try (Connection connection =
DriverManager.getConnection(
String.format(
"jdbc:cloudspanner:/projects/%s/instances/%s/databases/%s?credentials=%s",
projectId, instanceId, databaseId,
credentialsFile))) {
try (Statement statement = connection.createStatement()) {
try (ResultSet rs = statement.executeQuery("SELECT name from t2")) {
while (rs.next()) {
list.add(rs.getString(1));
LOG.info("print outside get value: " + rs.getString(1));
}
}
}
}
}
catch(Exception e) {
LOG.error("", e);
}
PCollection<String> results =
pipeline.apply(Create.of(list).withType(TypeDescriptor.of(String.class))).setCoder(StringUtf8Coder.of());
// ----- block 1 end---------------------
// ----- block 2 start---------------------
// block 2 will print in server logs
results.apply("print value", ParDo.of(new MapFn()));
// ----- block 2 end---------------------
pipeline.run();
LOG.info("Completed pipeline setup");
}
}
I ran the program like this:
mvn compile exec:java \
-Dexec.mainClass=com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText \
-Dexec.args="--runner=DataflowRunner \
--region=us-central1"
There logs printed out in my local console:
org.apache.beam.runners.dataflow.DataflowRunner - PipelineOptions.filesToStage
was not specified. Defaulting tofiles from the classpath: will stage 351 files.
Enable logging at DEBUG level to see which files will be staged.
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value:
myname
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: 2
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: 3
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: 4
org.apache.beam.runners.dataflow.DataflowRunner - Executing pipeline on the
Dataflow Service, which will have billing implications related to Google
Compute Engine usage and other Google Cloud Services.
org.apache.beam.runners.dataflow.util.PackageUtil - Uploading 351 files from
PipelineOptions.filesToStage to staging location to prepare for execution.
org.apache.beam.runners.dataflow.util.PackageUtil - Uploading
/Users/shengyang/ws/Stubhub-DataPlatform.dataworks-ingestion/target/classes to
gs://dataflow-staging-us-central1-661544897337/temp/staging/classes-KrjSD-Y0s4i28kG-XmiBiw.jar
There logs printed in gcp servers
2020-06-30 09:44:57.483 HKT
Finished processing stage F0 with 0 errors in 0.28 seconds
2020-06-30 09:44:59.600 HKT
Starting MapTask stage s01
2020-06-30 09:45:00.916 HKT
in mapfn - get value:myname
2020-06-30 09:45:00.934 HKT
Finished processing stage s01 with 0 errors in 1.333 seconds
2020-06-30 09:45:03.025 HKT
Starting MapTask stage s01
2020-06-30 09:45:03.046 HKT
in mapfn - get value:4
2020-06-30 09:45:03.047 HKT
Finished processing stage s01 with 0 errors in 0.022 seconds
2020-06-30 09:45:05.148 HKT
Starting MapTask stage s01
2020-06-30 09:45:05.166 HKT
in mapfn - get value:2
2020-06-30 09:45:05.176 HKT
Finished processing stage s01 with 0 errors in 0.028 seconds
Why Spanner JDBC call happens (in block 1) in my local machine during compile
phase? while MapFn (in block 2) happens in server side, I expect all of them
happen in server side.
At 2020-06-30 00:17:51, "Luke Cwik" <[email protected]> wrote:
The intent is that you grant permissions to the account that is running the
Dataflow job to the resources you want it to access in project B before you
start the pipeline. This allows for much finer grain access control and the
ability to revoke permissions without having to disable an entire account.
I would take a look at the general IAM and security documentation within GCP[1]
or open up a support case with GCP requesting guidance.
1: https://cloud.google.com/iam
On Sun, Jun 28, 2020 at 8:56 AM Austin Bennett <[email protected]>
wrote:
I havent tried yet, but looks like the connection string asks for the project
to be specified. Based on that (and cross project working for other
circumstances), I would imagine it will work, but...? Give it a try!
One tricky place might be ensuring proper permissions, in both projects (and
without being too open).
On Sat, Jun 27, 2020, 5:46 AM Sheng Yang <[email protected]> wrote:
Hi,
I am working on Beam using Dataflow engine. Recently I am working on reading
spanner data from different project. Say I run my Beam dataflow job in GCP
project A, but the Spanner is in GCP project B. I searched all the documents,
but can't find any documentation about SpannerIO reading data with the custom
credential key files. Right now I am considering JdbcIO because it accepts
custom credential as parameters and spanner also have jdbc api[1].
Do I have something wrong in my description? Or am I considering the correct
approach?
String url ="jdbc:cloudspanner:/projects/my_project_id/"
+"instances/my_instance_id/"+"databases/my_database_name"+"?credentials=/home/cloudspanner-keys/my-key.json"+";autocommit=false";
try (Connection connection =DriverManager.getConnection(url)) {
try(ResultSet rs = connection.createStatement()
.executeQuery("SELECT SingerId, AlbumId, MarketingBudget FROM
Albums")) {
while(rs.next()) {
Long singerId = rs.getLong(1);
}
}
}
[1]: https://github.com/googleapis/java-spanner-jdbc
Thanks,
Sheng