Verify in unit test that BigQuery executing project is used.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70c1e2c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70c1e2c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70c1e2c7

Branch: refs/heads/master
Commit: 70c1e2c7e005336d764ef9b0cf02afeb967feac5
Parents: ffed1d4
Author: Pei He <pe...@google.com>
Authored: Mon May 23 14:57:59 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue May 24 21:31:05 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 39 +++++++++++++++++---
 1 file changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70c1e2c7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 7c360b9..2865b23 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.beam.sdk.io.BigQueryIO.fromJsonString;
 import static org.apache.beam.sdk.io.BigQueryIO.toJsonString;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -64,6 +66,7 @@ import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.api.client.util.Data;
+import com.google.api.client.util.Strings;
 import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -205,6 +208,7 @@ public class BigQueryIOTest implements Serializable {
 
     private Object[] startJobReturns;
     private Object[] pollJobReturns;
+    private String executingProject;
     // Both counts will be reset back to zeros after serialization.
     // This is a work around for DoFn's verifyUnmodified check.
     private transient int startJobCallsCount;
@@ -238,27 +242,42 @@ public class BigQueryIOTest implements Serializable {
       return this;
     }
 
+    /**
+     * Verifies executing project.
+     */
+    public FakeJobService verifyExecutingProject(String executingProject) {
+      this.executingProject = executingProject;
+      return this;
+    }
+
     @Override
     public void startLoadJob(JobReference jobRef, JobConfigurationLoad 
loadConfig)
         throws InterruptedException, IOException {
-      startJob();
+      startJob(jobRef);
     }
 
     @Override
     public void startExtractJob(JobReference jobRef, JobConfigurationExtract 
extractConfig)
         throws InterruptedException, IOException {
-      startJob();
+      startJob(jobRef);
     }
 
     @Override
     public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
         throws IOException, InterruptedException {
-      startJob();
+      startJob(jobRef);
     }
 
     @Override
     public Job pollJob(JobReference jobRef, int maxAttempts)
         throws InterruptedException {
+      if (!Strings.isNullOrEmpty(executingProject)) {
+        checkArgument(
+            jobRef.getProjectId().equals(executingProject),
+            "Project id: %s is not equal to executing project: %s",
+            jobRef.getProjectId(), executingProject);
+      }
+
       if (pollJobStatusCallsCount < pollJobReturns.length) {
         Object ret = pollJobReturns[pollJobStatusCallsCount++];
         if (ret instanceof Job) {
@@ -276,7 +295,14 @@ public class BigQueryIOTest implements Serializable {
       }
     }
 
-    private void startJob() throws IOException, InterruptedException {
+    private void startJob(JobReference jobRef) throws IOException, 
InterruptedException {
+      if (!Strings.isNullOrEmpty(executingProject)) {
+        checkArgument(
+            jobRef.getProjectId().equals(executingProject),
+            "Project id: %s is not equal to executing project: %s",
+            jobRef.getProjectId(), executingProject);
+      }
+
       if (startJobCallsCount < startJobReturns.length) {
         Object ret = startJobReturns[startJobCallsCount++];
         if (ret instanceof IOException) {
@@ -479,7 +505,8 @@ public class BigQueryIOTest implements Serializable {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done")
-            .pollJobReturns(Status.UNKNOWN))
+            .pollJobReturns(Status.UNKNOWN)
+            .verifyExecutingProject(bqOptions.getProject()))
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", 1)),
             toJsonString(new TableRow().set("name", "b").set("number", 2)),
@@ -487,7 +514,7 @@ public class BigQueryIOTest implements Serializable {
 
     Pipeline p = TestPipeline.create(bqOptions);
     PCollection<String> output = p
-        .apply(BigQueryIO.Read.from("foo.com:project:somedataset.sometable")
+        
.apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
             .withoutValidation())
         .apply(ParDo.of(new DoFn<TableRow, String>() {

Reply via email to