This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 989c928  [SQL] Support complex identifiers in DataCatalog
     new fd67fd3  Merge pull request #9353 from 
akedin/datacatalog-custom-name-resolution
989c928 is described below

commit 989c928f87b1e195d06862414b3ed6545e60ab2f
Author: akedin <ke...@google.com>
AuthorDate: Thu Aug 15 12:09:39 2019 -0700

    [SQL] Support complex identifiers in DataCatalog
---
 build.gradle                                       |   1 +
 sdks/java/extensions/sql/datacatalog/build.gradle  |  36 +++-
 .../datacatalog/DataCatalogTableProvider.java      |  24 ++-
 .../meta/provider/datacatalog/ZetaSqlIdUtils.java  |  79 ++++++++
 .../datacatalog/DataCatalogBigQueryIT.java         |  98 ++++++++++
 .../provider/datacatalog/ZetaSqlIdUtilsTest.java   |  64 +++++++
 .../extensions/sql/impl/TableResolutionUtils.java  |   4 +-
 .../sql/meta/provider/FullNameTableProvider.java   | 166 +++++++++++++++++
 .../sql/meta/CustomTableResolverTest.java          | 206 +++++++--------------
 .../beam/sdk/io/gcp/bigquery/TestBigQuery.java     |   4 +
 10 files changed, 538 insertions(+), 144 deletions(-)

diff --git a/build.gradle b/build.gradle
index d7e7c06..fd0c74a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -158,6 +158,7 @@ task javaPostCommit() {
 task sqlPostCommit() {
   dependsOn ":sdks:java:extensions:sql:postCommit"
   dependsOn ":sdks:java:extensions:sql:jdbc:postCommit"
+  dependsOn ":sdks:java:extensions:sql:datacatalog:postCommit"
 }
 
 task javaPostCommitPortabilityApi () {
diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle 
b/sdks/java/extensions/sql/datacatalog/build.gradle
index b9bff77..530c2ec 100644
--- a/sdks/java/extensions/sql/datacatalog/build.gradle
+++ b/sdks/java/extensions/sql/datacatalog/build.gradle
@@ -1,5 +1,3 @@
-import groovy.json.JsonOutput
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,6 +16,8 @@ import groovy.json.JsonOutput
  * limitations under the License.
  */
 
+import groovy.json.JsonOutput
+
 plugins { id 'org.apache.beam.module' }
 
 applyJavaNature()
@@ -59,3 +59,35 @@ task runDataCatalogExample(type: JavaExec) {
     "--tempLocation=${gcsTempRoot}",
   ]
 }
+
+
+task integrationTest(type: Test) {
+  group = "Verification"
+  def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+  def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests/'
+
+  // Disable Gradle cache (it should not be used because the IT's won't run).
+  outputs.upToDateWhen { false }
+
+  def pipelineOptions = [
+          "--project=${gcpProject}",
+          "--tempLocation=${gcsTempRoot}",
+          "--blockOnRun=false"]
+
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)
+
+  include '**/*IT.class'
+  maxParallelForks 4
+  classpath = project(":sdks:java:extensions:sql:datacatalog")
+          .sourceSets
+          .test
+          .runtimeClasspath
+  testClassesDirs = 
files(project(":sdks:java:extensions:sql:datacatalog").sourceSets.test.output.classesDirs)
+  useJUnit {}
+}
+
+task postCommit {
+  group = "Verification"
+  description = "Various integration tests"
+  dependsOn integrationTest
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
index 1ca0959..3fa8594 100644
--- 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
@@ -27,16 +27,19 @@ import java.util.Map;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /** Uses DataCatalog to get the source type and schema for a table. */
-public class DataCatalogTableProvider implements TableProvider {
+public class DataCatalogTableProvider extends FullNameTableProvider {
 
   private Map<String, TableProvider> delegateProviders;
   private Map<String, Table> tableCache;
@@ -92,8 +95,23 @@ public class DataCatalogTableProvider implements 
TableProvider {
   }
 
   @Override
-  public @Nullable Table getTable(String tableName) {
-    return loadTable(tableName);
+  public @Nullable Table getTable(String tableNamePart) {
+    throw new UnsupportedOperationException(
+        "Loading a table by partial name '" + tableNamePart + "' is 
unsupported");
+  }
+
+  @Override
+  public @Nullable Table getTableByFullName(TableName fullTableName) {
+
+    ImmutableList<String> allNameParts =
+        ImmutableList.<String>builder()
+            .addAll(fullTableName.getPath())
+            .add(fullTableName.getTableName())
+            .build();
+
+    String fullEscapedTableName = ZetaSqlIdUtils.escapeAndJoin(allNameParts);
+
+    return loadTable(fullEscapedTableName);
   }
 
   private @Nullable Table loadTable(String tableName) {
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtils.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtils.java
new file mode 100644
index 0000000..eac82c4
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.List;
+import java.util.regex.Pattern;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** Utils to work with ZetaSQL-compatible IDs. */
+class ZetaSqlIdUtils {
+
+  /**
+   * Some special characters we explicitly handle.
+   *
+   * <p>Everything else is ignored, e.g. tabs, newlines, etc.
+   */
+  private static final Pattern SPECIAL_CHARS_ESCAPE =
+      Pattern.compile(
+          "(?<SpecialChar>["
+              + "\\\\" // slash
+              + "`" //    backtick
+              + "'" //    single quote
+              + "\"" //   double quote
+              + "?" //    question mark
+              + "])");
+
+  private static final ImmutableMap<String, String> WHITESPACES =
+      ImmutableMap.of(
+          "\n", "\\\\n",
+          "\t", "\\\\t",
+          "\r", "\\\\r",
+          "\f", "\\\\f");
+
+  private static final Pattern SIMPLE_ID = 
Pattern.compile("[A-Za-z_][A-Za-z_0-9]*");
+
+  /**
+   * Joins parts into a single compound ZetaSQL identifier.
+   *
+   * <p>Escapes backticks, slashes, double and single quotes, doesn't handle 
other special
+   * characters for now.
+   */
+  static String escapeAndJoin(List<String> parts) {
+    return parts.stream()
+        .map(ZetaSqlIdUtils::escapeSpecialChars)
+        .map(ZetaSqlIdUtils::replaceWhitespaces)
+        .map(ZetaSqlIdUtils::backtickIfNeeded)
+        .collect(joining("."));
+  }
+
+  private static String escapeSpecialChars(String str) {
+    return SPECIAL_CHARS_ESCAPE.matcher(str).replaceAll("\\\\${SpecialChar}");
+  }
+
+  private static String replaceWhitespaces(String s) {
+    return WHITESPACES.keySet().stream()
+        .reduce(s, (str, whitespace) -> str.replaceAll(whitespace, 
WHITESPACES.get(whitespace)));
+  }
+
+  private static String backtickIfNeeded(String s) {
+    return SIMPLE_ID.matcher(s).matches() ? s : ("`" + s + "`");
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java
 
b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java
new file mode 100644
index 0000000..4c0d6e9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for DataCatalog+BigQuery. */
+@RunWith(JUnit4.class)
+public class DataCatalogBigQueryIT {
+
+  private static final Schema ID_NAME_SCHEMA =
+      Schema.builder().addNullableField("id", INT64).addNullableField("name", 
STRING).build();
+
+  @Rule public transient TestPipeline writeToBQPipeline = 
TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+  @Rule public transient TestBigQuery bigQuery = 
TestBigQuery.create(ID_NAME_SCHEMA);
+
+  @Test
+  public void testReadWrite() throws Exception {
+    createBQTableWith(
+        new TableRow().set("id", 1).set("name", "name1"),
+        new TableRow().set("id", 2).set("name", "name2"),
+        new TableRow().set("id", 3).set("name", "name3"));
+
+    TableReference bqTable = bigQuery.tableReference();
+    String tableId =
+        String.format(
+            "bigquery.`table`.`%s`.`%s`.`%s`",
+            bqTable.getProjectId(), bqTable.getDatasetId(), 
bqTable.getTableId());
+
+    PCollection<Row> result =
+        readPipeline.apply(
+            "query",
+            SqlTransform.query("SELECT id, name FROM " + tableId)
+                .withDefaultTableProvider(
+                    "datacatalog", 
DataCatalogTableProvider.create(readPipeline.getOptions())));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "name1"), row(2, "name2"), 
row(3, "name3"));
+    readPipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  private Row row(long id, String name) {
+    return Row.withSchema(ID_NAME_SCHEMA).addValues(id, name).build();
+  }
+
+  private void createBQTableWith(TableRow r1, TableRow r2, TableRow r3) {
+    writeToBQPipeline
+        .apply(Create.of(r1, r2, r3).withCoder(TableRowJsonCoder.of()))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to(bigQuery.tableSpec())
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("id").setType("INTEGER"),
+                                new 
TableFieldSchema().setName("name").setType("STRING"))))
+                .withoutValidation());
+    writeToBQPipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtilsTest.java
 
b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtilsTest.java
new file mode 100644
index 0000000..ff34ef8
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtilsTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+/** Unit tests for {@link ZetaSqlIdUtils}. */
+public class ZetaSqlIdUtilsTest {
+
+  @Test
+  public void testHandlesSimpleIds() {
+    List<String> id = Arrays.asList("aaa", "BbB", "zAzzz00");
+    assertEquals("aaa.BbB.zAzzz00", ZetaSqlIdUtils.escapeAndJoin(id));
+  }
+
+  @Test
+  public void testHandlesMixedIds() {
+    List<String> id = Arrays.asList("aaa", "Bb---B", "zAzzz00");
+    assertEquals("aaa.`Bb---B`.zAzzz00", ZetaSqlIdUtils.escapeAndJoin(id));
+  }
+
+  @Test
+  public void testHandlesSpecialChars() {
+    List<String> id = Arrays.asList("a\\a", "b`b", "c'c", "d\"d", "e?e");
+    assertEquals("`a\\\\a`.`b\\`b`.`c\\'c`.`d\\\"d`.`e\\?e`", 
ZetaSqlIdUtils.escapeAndJoin(id));
+  }
+
+  @Test
+  public void testHandlesSpecialCharsInOnePart() {
+    List<String> id = Arrays.asList("a\\ab`bc'cd\"de?e");
+    assertEquals("`a\\\\ab\\`bc\\'cd\\\"de\\?e`", 
ZetaSqlIdUtils.escapeAndJoin(id));
+  }
+
+  @Test
+  public void testHandlesWhiteSpaces() {
+    List<String> id = Arrays.asList("a\na", "b\tb", "c\rc", "d\fd");
+    assertEquals("`a\\na`.`b\\tb`.`c\\rc`.`d\\fd`", 
ZetaSqlIdUtils.escapeAndJoin(id));
+  }
+
+  @Test
+  public void testHandlesWhiteSpacesInOnePart() {
+    List<String> id = Arrays.asList("a\nab\tbc\rcd\fd");
+    assertEquals("`a\\nab\\tbc\\rcd\\fd`", ZetaSqlIdUtils.escapeAndJoin(id));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
index af6146b..247f1f7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
@@ -34,8 +34,8 @@ import org.apache.calcite.sql.SqlNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** TableResolutionUtils. */
-public class TableResolutionUtils {
+/** Utils to wire up the custom table resolution into Calcite's planner. */
+class TableResolutionUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TableResolutionUtils.class);
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java
new file mode 100644
index 0000000..066e779
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.meta.CustomTableResolver;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/**
+ * Base class for table providers that look up table metadata using full table 
names, instead of
+ * querying it by parts of the name separately.
+ */
+@Experimental
+public abstract class FullNameTableProvider implements TableProvider, 
CustomTableResolver {
+
+  private List<TableName> knownTables;
+
+  protected FullNameTableProvider() {
+    knownTables = new ArrayList<>();
+  }
+
+  public abstract Table getTableByFullName(TableName fullTableName);
+
+  @Override
+  public void registerKnownTableNames(List<TableName> tableNames) {
+    knownTables.addAll(tableNames);
+  }
+
+  @Override
+  public TableProvider getSubProvider(String name) {
+    // TODO: implement with trie
+
+    // If 'name' matches a sub-schema/sub-provider we start tracking
+    // the subsequent calls to getSubProvider().
+    //
+    // Simple table ids and final table lookup
+    //
+    // If there is no matching sub-schema then returning null from here 
indicates
+    // that 'name' is either not part of this schema or it's a table, not a 
sub-schema,
+    // this will be checked right after this in a getTable() call.
+    //
+    // Because this is a getSubProvider() call it means Calcite expects
+    // the sub-schema/sub-provider to be returned, not a table,
+    // so we only need to check against known compound table identifiers.
+    // If 'name' acutally represents a simple identifier then it will be 
checked
+    // in a 'getTable()' call later. Unless there's the same sub-provider name,
+    // in which case it's a conflict and we will use the sub-schema and not 
assume it's a table.
+    // Calcite does the same.
+    //
+    // Here we find if there are any parsed tables that start from 'name' that 
belong to this
+    // table provider.
+    // We then create a fake tracking provider that in a trie-manner collects
+    // getSubProvider()/getTable() calls by checking whether there are known 
parsed table names
+    // matching what Calcite asks us for.
+    List<TableName> tablesToLookFor =
+        knownTables.stream()
+            .filter(TableName::isCompound)
+            .filter(tableName -> tableName.getPrefix().equals(name))
+            .collect(toList());
+
+    return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, 
tablesToLookFor) : null;
+  }
+
+  /**
+   * Calcite calls getSubProvider()/getTable() on this class when resolving a 
table name. This class
+   * keeps track of these calls and checks against known table names 
(extracted from a query), so
+   * that when a full table name is parsed out it calls the actual table 
provider to get a table
+   * based on the full name, instead of calling it component by component.
+   *
+   * <p>This class nables table providers to query their metadata source using 
full table names.
+   */
+  class TableNameTrackingProvider extends InMemoryMetaTableProvider {
+    int schemaLevel;
+    List<TableName> tableNames;
+
+    TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) {
+      this.schemaLevel = schemaLevel;
+      this.tableNames = tableNames;
+    }
+
+    @Override
+    public TableProvider getSubProvider(String name) {
+      // Find if any of the parsed table names have 'name' as part
+      // of their path at current index.
+      //
+      // If there are, return a new tracking provider for such tables and 
incremented index.
+      //
+      // If there are none, it means something weird has happened and 
returning null
+      // will make Calcite try other schemas. Maybe things will work out.
+      //
+      // However since we originally register all parsed table names for the 
given schema
+      // in this provider we should only receive a getSubProvider() call for 
something unknown
+      // when it's a leaf path element, i.e. actual table name, which will be 
handled in
+      // getTable() call.
+      List<TableName> matchingTables =
+          tableNames.stream()
+              .filter(TableName::isCompound)
+              .filter(tableName -> tableName.getPath().size() > schemaLevel)
+              .filter(tableName -> 
tableName.getPath().get(schemaLevel).equals(name))
+              .collect(toList());
+
+      return matchingTables.size() > 0
+          ? new TableNameTrackingProvider(schemaLevel + 1, matchingTables)
+          : null;
+    }
+
+    @Override
+    public String getTableType() {
+      return "google.cloud.datacatalog.subprovider";
+    }
+
+    @Nullable
+    @Override
+    public Table getTable(String name) {
+
+      // This is called only after getSubProvider() returned null,
+      // and since we are tracking the actual parsed table names, this should
+      // be it, there should exist a parsed table that matches the 'name'.
+
+      Optional<TableName> matchingTable =
+          tableNames.stream()
+              .filter(tableName -> tableName.getTableName().equals(name))
+              .findFirst();
+
+      TableName fullTableName =
+          matchingTable.orElseThrow(
+              () ->
+                  new IllegalStateException(
+                      "Unexpected table '"
+                          + name
+                          + "' requested. Current schema level is "
+                          + schemaLevel
+                          + ". Current known table names: "
+                          + tableNames.toString()));
+      return FullNameTableProvider.this.getTableByFullName(fullTableName);
+    }
+
+    @Override
+    public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
+      return FullNameTableProvider.this.buildBeamSqlTable(table);
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
index 484e031..bd70391 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
@@ -17,16 +17,12 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta;
 
-import static java.util.stream.Collectors.toList;
-
 import java.io.Serializable;
-import java.util.List;
-import java.util.Optional;
-import javax.annotation.Nullable;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.extensions.sql.impl.TableName;
-import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
@@ -37,7 +33,7 @@ import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 
-/** CustomTableResolverTest. */
+/** Test for custom table resolver and full name table provider. */
 public class CustomTableResolverTest implements Serializable {
 
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
@@ -51,126 +47,62 @@ public class CustomTableResolverTest implements 
Serializable {
    * <p>Demonstrates how to parse table names as in normal Calcite queries 
syntax, e.g. {@code
    * a.b.c.d} and convert them to its' own custom table name format {@code 
a_b_c_d}.
    */
-  public static class CustomResolutionTestTableProvider extends 
TestTableProvider
-      implements CustomTableResolver {
+  public static class CustomResolutionTestTableProvider extends 
FullNameTableProvider {
+
+    TestTableProvider delegateTableProvider;
+
+    public CustomResolutionTestTableProvider() {
+      delegateTableProvider = new TestTableProvider();
+    }
+
+    @Override
+    public Table getTable(String tableName) {
+      return delegateTableProvider.getTable(tableName);
+    }
+
+    @Override
+    public Table getTableByFullName(TableName fullTableName) {
+      // For the test we register tables with underscore instead of dots, so 
here we lookup the
+      // tables
+      // with those underscore.
+      String actualTableName =
+          String.join("_", fullTableName.getPath()) + "_" + 
fullTableName.getTableName();
+      return delegateTableProvider.getTable(actualTableName);
+    }
 
-    List<TableName> parsedTableNames = null;
+    @Override
+    public String getTableType() {
+      return delegateTableProvider.getTableType();
+    }
 
     @Override
-    public void registerKnownTableNames(List<TableName> tableNames) {
-      parsedTableNames = tableNames;
+    public void createTable(Table table) {
+      delegateTableProvider.createTable(table);
+    }
+
+    public void addRows(String tableName, Row... rows) {
+      delegateTableProvider.addRows(tableName, rows);
     }
 
     @Override
-    public TableProvider getSubProvider(String name) {
-      // TODO: implement with trie
-
-      // If 'name' matches a sub-schema/sub-provider we start tracking
-      // the subsequent calls to getSubProvider().
-      //
-      // Simple table ids and final table lookup
-      //
-      // If there is no matching sub-schema then returning null from here 
indicates
-      // that 'name' is either not part of this schema or it's a table, not a 
sub-schema,
-      // this will be checked right after this in a getTable() call.
-      //
-      // Because this is a getSubProvider() call it means Calcite expects
-      // the sub-schema/sub-provider to be returned, not a table,
-      // so we only need to check against known compound table identifiers.
-      // If 'name' acutally represents a simple identifier then it will be 
checked
-      // in a 'getTable()' call later. Unless there's the same sub-provider 
name,
-      // in which case it's a conflict and we will use the sub-schema and not 
assume it's a table.
-      // Calcite does the same.
-      //
-      // Here we find if there are any parsed tables that start from 'name' 
that belong to this
-      // table provider.
-      // We then create a fake tracking provider that in a trie-manner collects
-      // getSubProvider()/getTable() calls by checking whether there are known 
parsed table names
-      // matching what Calcite asks us for.
-      List<TableName> tablesToLookFor =
-          parsedTableNames.stream()
-              .filter(TableName::isCompound)
-              .filter(tableName -> tableName.getPrefix().equals(name))
-              .collect(toList());
-
-      return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, 
tablesToLookFor) : null;
+    public void dropTable(String tableName) {
+      delegateTableProvider.dropTable(tableName);
     }
 
-    class TableNameTrackingProvider extends TestTableProvider {
-      int schemaLevel;
-      List<TableName> tableNames;
-
-      TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) {
-        this.schemaLevel = schemaLevel;
-        this.tableNames = tableNames;
-      }
-
-      @Override
-      public TableProvider getSubProvider(String name) {
-        // Find if any of the parsed table names have 'name' as part
-        // of their path at current index.
-        //
-        // If there are, return a new tracking provider for such tables and 
incremented index.
-        //
-        // If there are none, it means something weird has happened and 
returning null
-        // will make Calcite try other schemas. Maybe things will work out.
-        //
-        // However since we originally register all parsed table names for the 
given schema
-        // in this provider we should only receive a getSubProvider() call for 
something unknown
-        // when it's a leaf path element, i.e. actual table name, which will 
be handled in
-        // getTable() call.
-        List<TableName> matchingTables =
-            tableNames.stream()
-                .filter(TableName::isCompound)
-                .filter(tableName -> tableName.getPath().size() > schemaLevel)
-                .filter(tableName -> 
tableName.getPath().get(schemaLevel).equals(name))
-                .collect(toList());
-
-        return matchingTables.size() > 0
-            ? new TableNameTrackingProvider(schemaLevel + 1, matchingTables)
-            : null;
-      }
-
-      @Nullable
-      @Override
-      public Table getTable(String name) {
-
-        // This is called only after getSubProvider() returned null,
-        // and since we are tracking the actual parsed table names, this should
-        // be it, there should exist a parsed table that matches the 'name'.
-
-        Optional<TableName> matchingTable =
-            tableNames.stream()
-                .filter(tableName -> tableName.getTableName().equals(name))
-                .findFirst();
-
-        TableName tableName =
-            matchingTable.orElseThrow(
-                () ->
-                    new IllegalStateException(
-                        "Unexpected table '"
-                            + name
-                            + "' requested. Current schema level is "
-                            + schemaLevel
-                            + ". Current known table names: "
-                            + tableNames.toString()));
-        // For test we register tables with underscore instead of dots, so 
here we lookup the tables
-        // with those underscore
-        String actualTableName =
-            String.join("_", tableName.getPath()) + "_" + 
tableName.getTableName();
-        return 
CustomResolutionTestTableProvider.this.getTable(actualTableName);
-      }
-
-      @Override
-      public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
-        return CustomResolutionTestTableProvider.this.buildBeamSqlTable(table);
-      }
+    @Override
+    public Map<String, Table> getTables() {
+      return delegateTableProvider.getTables();
+    }
+
+    @Override
+    public BeamSqlTable buildBeamSqlTable(Table table) {
+      return delegateTableProvider.buildBeamSqlTable(table);
     }
   }
 
   @Test
   public void testSimpleId() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
@@ -187,7 +119,7 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testSimpleIdWithExplicitDefaultSchema() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
@@ -204,12 +136,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testSimpleIdWithExplicitDefaultSchemaWithMultipleProviders() 
throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));
@@ -227,12 +159,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testSimpleIdWithExplicitNonDefaultSchema() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));
@@ -250,7 +182,7 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testCompoundIdInDefaultSchema() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));
@@ -267,7 +199,7 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testCompoundIdInExplicitDefaultSchema() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));
@@ -284,7 +216,7 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testLongCompoundIdInDefaultSchema() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
@@ -301,12 +233,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testLongCompoundIdInDefaultSchemaWithMultipleProviders() throws 
Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, 
"four"));
@@ -324,7 +256,7 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testLongCompoundIdInExplicitDefaultSchema() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
@@ -341,12 +273,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testLongCompoundIdInNonDefaultSchemaSameTableNames() throws 
Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, 
"four"));
@@ -364,12 +296,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testLongCompoundIdInNonDefaultSchemaDifferentNames() throws 
Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         Table.builder()
             .name("testtable2_blah2_foo2_bar2")
@@ -391,12 +323,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testJoinWithLongCompoundIds() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
@@ -420,12 +352,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testInnerJoinWithLongCompoundIds() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
@@ -449,12 +381,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testJoinWithLongCompoundIdsWithAliases() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
@@ -478,12 +410,12 @@ public class CustomTableResolverTest implements 
Serializable {
 
   @Test
   public void testUnionWithLongCompoundIds() throws Exception {
-    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider = new 
CustomResolutionTestTableProvider();
     tableProvider.createTable(
         
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
     tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
 
-    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    CustomResolutionTestTableProvider tableProvider2 = new 
CustomResolutionTestTableProvider();
     tableProvider2.createTable(
         
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
     tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
index f362fcd..4b4a97e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
@@ -189,6 +189,10 @@ public class TestBigQuery implements TestRule {
         table.getTableReference().getTableId());
   }
 
+  public TableReference tableReference() {
+    return table.getTableReference();
+  }
+
   /**
    * Loads rows from BigQuery into {@link Row Rows} with given {@link Schema}.
    *

Reply via email to