This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dc4667  [SQL] Add custom table name resolution
     new a44f7d8  Merge pull request #9343 from 
akedin/custom-table-name-resolution
2dc4667 is described below

commit 2dc46671311b0203db5aaf2906e0ad8c21ed8b14
Author: akedin <ke...@google.com>
AuthorDate: Tue Apr 30 16:04:31 2019 -0700

    [SQL] Add custom table name resolution
---
 .../extensions/sql/TableNameExtractionUtils.java   |  98 ++++
 .../extensions/sql/impl/CalciteQueryPlanner.java   |   5 +-
 .../beam/sdk/extensions/sql/impl/TableName.java    | 100 ++++
 .../extensions/sql/impl/TableResolutionUtils.java  | 214 +++++++++
 .../extensions/sql/meta/CustomTableResolver.java   |  45 ++
 .../CalciteCannotParseSimpleIdentifiersTest.java   |  77 +++
 .../sql/CalciteParsesSimpleIdentifiersTest.java    | 133 ++++++
 .../sql/meta/CustomTableResolverTest.java          | 514 +++++++++++++++++++++
 8 files changed, 1185 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableNameExtractionUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableNameExtractionUtils.java
new file mode 100644
index 0000000..556c246
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableNameExtractionUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.calcite.sql.SqlAsOperator;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSetOperator;
+
+/**
+ * Helper class to extract table identifiers from the query.
+ *
+ * <p>Supports queries:
+ *
+ * <pre>
+ *   ... FROM table...
+ *   ... FROM table1, table2 AS x...
+ *   ... FROM table1 JOIN (LEFT, INNER, OUTER etc) table2 JOIN table3 ...
+ *   ... FROM table1 UNION (INTERSECT etc) SELECT ...
+ * </pre>
+ */
+public class TableNameExtractionUtils {
+
+  public static List<TableName> extractTableNamesFromNode(SqlNode node) {
+    if (node instanceof SqlSelect) {
+      return extractTableFromSelect((SqlSelect) node);
+    }
+
+    if (node instanceof SqlIdentifier) {
+      return extractFromIdentifier((SqlIdentifier) node);
+    }
+
+    if (node instanceof SqlJoin) {
+      return extractFromJoin((SqlJoin) node);
+    }
+
+    if (node instanceof SqlCall) {
+      return extractFromCall((SqlCall) node);
+    }
+
+    return Collections.emptyList();
+  }
+
+  private static List<TableName> extractTableFromSelect(SqlSelect node) {
+    return extractTableNamesFromNode(node.getFrom());
+  }
+
+  private static List<TableName> extractFromCall(SqlCall node) {
+    if (node.getOperator() instanceof SqlAsOperator) {
+      return extractTableNamesFromNode(node.getOperandList().get(0));
+    }
+
+    if (node.getOperator() instanceof SqlSetOperator) {
+      return node.getOperandList().stream()
+          .map(TableNameExtractionUtils::extractTableNamesFromNode)
+          .flatMap(Collection::stream)
+          .collect(toList());
+    }
+
+    return Collections.emptyList();
+  }
+
+  private static List<TableName> extractFromJoin(SqlJoin join) {
+    return ImmutableList.<TableName>builder()
+        .addAll(extractTableNamesFromNode(join.getLeft()))
+        .addAll(extractTableNamesFromNode(join.getRight()))
+        .build();
+  }
+
+  private static List<TableName> extractFromIdentifier(SqlIdentifier 
identifier) {
+    return ImmutableList.of(TableName.create(identifier.names));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index 10b529c..8215346 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -71,9 +71,11 @@ class CalciteQueryPlanner implements QueryPlanner {
   private static final Logger LOG = 
LoggerFactory.getLogger(CalciteQueryPlanner.class);
 
   private final Planner planner;
+  private final JdbcConnection connection;
 
   public CalciteQueryPlanner(JdbcConnection connection, RuleSet[] ruleSets) {
-    planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
+    this.connection = connection;
+    this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
   }
 
   public FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] 
ruleSets) {
@@ -138,6 +140,7 @@ class CalciteQueryPlanner implements QueryPlanner {
     BeamRelNode beamRelNode;
     try {
       SqlNode parsed = planner.parse(sqlStatement);
+      TableResolutionUtils.setupCustomTableResolution(connection, parsed);
       SqlNode validated = planner.validate(parsed);
       LOG.info("SQL:\n" + validated);
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableName.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableName.java
new file mode 100644
index 0000000..282f0c2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableName.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+
+/** Represents a parsed table name that is specified in a FROM clause (and 
other places). */
+@AutoValue
+public abstract class TableName {
+
+  /**
+   * Table path up to the leaf table name.
+   *
+   * <p>Does not necessarily start from a schema name.
+   *
+   * <p>Does not include the actual table name, see {@link #getTableName()}.
+   */
+  public abstract List<String> getPath();
+
+  /** Table name, the last element of the fully-specified table name with 
path. */
+  public abstract String getTableName();
+
+  /** Full table name with path. */
+  public static TableName create(List<String> fullPath) {
+    checkNotNull(fullPath, "Full table path cannot be null");
+    checkArgument(fullPath.size() > 0, "Full table path has to have at least 
one element");
+    return create(fullPath.subList(0, fullPath.size() - 1), 
fullPath.get(fullPath.size() - 1));
+  }
+
+  /** Table name plus the path up to but not including table name. */
+  public static TableName create(List<String> path, String tableName) {
+    checkNotNull(tableName, "Table name cannot be null");
+    return new AutoValue_TableName(path == null ? Collections.emptyList() : 
path, tableName);
+  }
+
+  /** Whether it's a compound table name (with multiple path components). */
+  public boolean isCompound() {
+    return getPath().size() > 0;
+  }
+
+  /** Whether it's a simple name, with a single name component. */
+  public boolean isSimple() {
+    return getPath().size() == 0;
+  }
+
+  /** First element in the path. */
+  public String getPrefix() {
+    checkState(isCompound());
+    return getPath().get(0);
+  }
+
+  /**
+   * Remove prefix, e.g. this is helpful when stripping the top-level schema 
to register a table
+   * name with a provider.
+   */
+  public TableName removePrefix() {
+    List<String> pathPostfix = getPath().stream().skip(1).collect(toList());
+    return TableName.create(pathPostfix, getTableName());
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
new file mode 100644
index 0000000..af6146b
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.TableNameExtractionUtils;
+import org.apache.beam.sdk.extensions.sql.meta.CustomTableResolver;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.sql.SqlNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** TableResolutionUtils. */
+public class TableResolutionUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TableResolutionUtils.class);
+
+  /**
+   * Extract table names from the FROM clauses, register them with root 
TableProviders that support
+   * custom table schema resolution, e.g. DataCatalog.
+   *
+   * <p>Go over top-level schemas in the JdbcConnection, and for all top-level 
table providers that
+   * support custom table resolution, register all the parsed table names with 
them.
+   *
+   * <p>This way when a table provider has custom name-resolution strategy it 
can analyze whether it
+   * supports the name without using Calcite's logic. E.g. for DataCatalog we 
need to assemble the
+   * table name back into a single string and then query the back-end, whereas 
Calcite would require
+   * us to call the back-end for each part of the table name.
+   *
+   * <p>The logic is:
+   *
+   * <pre>
+   *   - if it's a compound identifier (table name contains multiple parts):
+   *       - get the first part of the identifier, assume it represents a 
top-level schema;
+   *       - find a top-level table provider with the same name;
+   *       - register the table identifier with it, if supported;
+   *       - if not supported, then ignore the table identifier, everything 
will be resolved using
+   *         existing Calcite's logic;
+   *
+   *   - if it's a simple identifier (contains only a table name without a 
schema part),
+   *     or if there was no matching top-level schema:
+   *       - register with the default schema, if it supports custom table 
resolution;
+   *       - if it does not, existing Calcite logic will still work as is;
+   * </pre>
+   */
+  static void setupCustomTableResolution(JdbcConnection connection, SqlNode 
parsed) {
+    List<TableName> tableNames = 
TableNameExtractionUtils.extractTableNamesFromNode(parsed);
+    String currentSchemaName = getCurrentSchemaName(connection);
+
+    SchemaWithName defaultSchema = SchemaWithName.create(connection, 
currentSchemaName);
+
+    if (defaultSchema.supportsCustomResolution()) {
+      registerWithDefaultSchema(connection, tableNames, defaultSchema);
+    }
+
+    registerWithTopLevelSchemas(connection, tableNames);
+  }
+
+  /** Current (default) schema name in the JdbcConnection. */
+  private static String getCurrentSchemaName(JdbcConnection connection) {
+    try {
+      return connection.getSchema();
+    } catch (SQLException e) {
+      throw new IllegalStateException(
+          "Unable to get current schema name from JdbcConnection. "
+              + "Assuming table names in the query are fully-qualified from 
the root.",
+          e);
+    }
+  }
+
+  /**
+   * Simple identifiers have to be resolved by the default schema, as well as 
compoung identifiers
+   * that don't have a matching top-level schema (meaning that a user didn't 
specify a top-level
+   * schema and expected it to be inferred).
+   */
+  private static void registerWithDefaultSchema(
+      JdbcConnection connection, List<TableName> tableNames, SchemaWithName 
defaultSchema) {
+    Set<String> topLevelSchemas = 
connection.getRootSchema().getSubSchemaNames();
+
+    List<TableName> simpleIdentifiers =
+        tableNames.stream().filter(TableName::isSimple).collect(toList());
+
+    List<TableName> withoutMatchingSchemas =
+        tableNames.stream()
+            .filter(name -> name.isCompound() && 
!topLevelSchemas.contains(name.getPrefix()))
+            .collect(toList());
+
+    List<TableName> explicitlyInDefaulSchema =
+        tableNames.stream()
+            .filter(name -> name.isCompound() && 
name.getPrefix().equals(defaultSchema.name))
+            .map(TableName::removePrefix)
+            .collect(toList());
+
+    List<TableName> shouldGoIntoDefaultSchema =
+        ImmutableList.<TableName>builder()
+            .addAll(simpleIdentifiers)
+            .addAll(withoutMatchingSchemas)
+            .addAll(explicitlyInDefaulSchema)
+            .build();
+
+    
defaultSchema.getCustomTableResolver().registerKnownTableNames(shouldGoIntoDefaultSchema);
+  }
+
+  /**
+   * Register compound table identifiers with the matching custom resolvers 
that correspond to the
+   * top-level schemas.
+   */
+  private static void registerWithTopLevelSchemas(
+      JdbcConnection connection, List<TableName> tableNames) {
+
+    Map<String, CustomTableResolver> topLevelResolvers = 
getCustomTopLevelResolvers(connection);
+
+    topLevelResolvers.forEach(
+        (topLevelSchemaName, resolver) ->
+            resolver.registerKnownTableNames(tablesForSchema(tableNames, 
topLevelSchemaName)));
+  }
+
+  /** Get the custom schema resolvers for all top-level schemas that support 
custom resolution. */
+  private static Map<String, CustomTableResolver> getCustomTopLevelResolvers(
+      JdbcConnection connection) {
+    return connection.getRootSchema().getSubSchemaNames().stream()
+        .map(topLevelSchemaName -> SchemaWithName.create(connection, 
topLevelSchemaName))
+        .filter(schema -> 
!schema.getName().equals(getCurrentSchemaName(connection)))
+        .filter(SchemaWithName::supportsCustomResolution)
+        .collect(toMap(SchemaWithName::getName, 
SchemaWithName::getCustomTableResolver));
+  }
+
+  /**
+   * Get the compound identifiers that have the first component matching the 
given top-level schema
+   * name and remove the first component.
+   */
+  private static List<TableName> tablesForSchema(
+      List<TableName> tableNames, String topLevelSchema) {
+    return tableNames.stream()
+        .filter(TableName::isCompound)
+        .filter(t -> t.getPrefix().equals(topLevelSchema))
+        .map(TableName::removePrefix)
+        .collect(toList());
+  }
+
+  /**
+   * A utility class that keeps track of schema name and other properties.
+   *
+   * <p>Sole purpose is to reduce inline boilerplate and encapsulate stuff.
+   */
+  private static class SchemaWithName {
+    String name;
+    org.apache.calcite.schema.Schema schema;
+
+    static SchemaWithName create(JdbcConnection connection, String name) {
+      SchemaWithName schemaWithName = new SchemaWithName();
+      schemaWithName.name = name;
+      schemaWithName.schema =
+          
CalciteSchema.from(connection.getRootSchema().getSubSchema(name)).schema;
+      return schemaWithName;
+    }
+
+    /** Whether this schema/table provider supports custom table resolution. */
+    boolean supportsCustomResolution() {
+      return isBeamSchema() && tableProviderSupportsCustomResolution();
+    }
+
+    /** Whether this Calcite schema is actually an instance of 
BeamCalciteSchema. */
+    boolean isBeamSchema() {
+      return schema instanceof BeamCalciteSchema;
+    }
+
+    /** Whether the table provider is an instance of CustomTableResolver. */
+    boolean tableProviderSupportsCustomResolution() {
+      return getTableProvider() instanceof CustomTableResolver;
+    }
+
+    /** Gets the table provider that backs the BeamCalciteSchema. */
+    TableProvider getTableProvider() {
+      checkState(isBeamSchema());
+      return ((BeamCalciteSchema) schema).getTableProvider();
+    }
+
+    /** Schema name. */
+    String getName() {
+      return name;
+    }
+
+    /** Custom table resolver in the provider. */
+    CustomTableResolver getCustomTableResolver() {
+      checkState(supportsCustomResolution());
+      return (CustomTableResolver) getTableProvider();
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolver.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolver.java
new file mode 100644
index 0000000..fc066bd
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * Interface that table providers can implement if they require custom table 
name resolution.
+ *
+ * <p>{@link #registerKnownTableNames(List)} is called by the parser/planner 
and takes the list of
+ * all tables mentioned in the query. Then when normal Calcite lifecycle is 
executed the table
+ * provider can now check against this list and perform custom resolution. 
This is a workaround for
+ * lack of context in Calcite's logic, e.g. it's impossible to receive the 
whole table name at once,
+ * or understand that it has done querying sub-schemas and expects a table.
+ */
+public interface CustomTableResolver extends TableProvider {
+
+  /**
+   * Register the table names as extracted from the FROM clause.
+   *
+   * <p>Calcite doesn't provide these full names to table providers and 
queries them with individual
+   * parts of the identifiers without giving any extra context. So if a table 
provider needs to
+   * implement some custom table name resolution strategy it doesn't have 
information to do so. E.g.
+   * if you want to take the compound SQL identifiers that were originally 
split by dots, join them
+   * into a single string, and then query a back-end service, this interface 
makes this possible.
+   */
+  void registerKnownTableNames(List<TableName> tableNames);
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteCannotParseSimpleIdentifiersTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteCannotParseSimpleIdentifiersTest.java
new file mode 100644
index 0000000..c696c28
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteCannotParseSimpleIdentifiersTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.junit.Assert.assertThrows;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.ParseException;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.function.ThrowingRunnable;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Examples of simple identifiers that Calcite is unable to parse.
+ *
+ * <p>Not an exhaustive list.
+ */
+@RunWith(Parameterized.class)
+public class CalciteCannotParseSimpleIdentifiersTest implements Serializable {
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  private final String input;
+
+  @Parameters(name = "{0}")
+  public static Iterable<Object> data() {
+    return Arrays.asList(
+        new Object[] {
+          "field id",
+          "field\nid",
+          "`field\nid`",
+          "field`id",
+          "field\\id",
+          "field``id",
+          "field\bid",
+          "field=id",
+          "field+id",
+          "field{id}",
+          "field.id",
+          "field\r_id",
+          "`field\r_id`"
+        });
+  }
+
+  public CalciteCannotParseSimpleIdentifiersTest(String input) {
+    this.input = input;
+  }
+
+  @Test
+  public void testFailsToParseAlias() {
+    assertThrows(ParseException.class, attemptParse(input));
+  }
+
+  private ThrowingRunnable attemptParse(String alias) {
+    return () -> BeamSqlEnv.inMemory().isDdl(String.format("SELECT 321 AS %s", 
alias));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteParsesSimpleIdentifiersTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteParsesSimpleIdentifiersTest.java
new file mode 100644
index 0000000..2532c02
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteParsesSimpleIdentifiersTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Examples of simple identifiers that Calcite is able to parse.
+ *
+ * <p>Not an exhaustive list.
+ */
+@RunWith(Parameterized.class)
+public class CalciteParsesSimpleIdentifiersTest implements Serializable {
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  private final String input;
+  private final String expected;
+
+  @Parameters(name = "{0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(
+        new Object[][] {
+          // --------------------------------
+          // user input    |    parsed as  |
+          // --------------------------------
+          {"field_id", "field_id"},
+          {"`field_id`", "field_id"},
+          {"`field``id`", "field`id"},
+          {"`field id`", "field id"},
+          {"`field-id`", "field-id"},
+          {"`field=id`", "field=id"},
+          {"`field.id`", "field.id"},
+          {"`field{id}`", "field{id}"},
+          {"`field|id`", "field|id"},
+          {"`field\\id`", "field\\id"},
+          {"`field\\a_id`", "field\\a_id"},
+          {"`field\b_id`", "field\b_id"},
+          {"`field\\b_id`", "field\\b_id"},
+          {"`field\\f_id`", "field\\f_id"},
+          {"`field\\n_id`", "field\\n_id"},
+          {"`field\\r_id`", "field\\r_id"},
+          {"`field\tid`", "field\tid"},
+          {"`field\\t_id`", "field\\t_id"},
+          {"`field\\v_id`", "field\\v_id"},
+          {"`field\\\\_id`", "field\\\\_id"},
+          {"`field\\?_id`", "field\\?_id"}
+        });
+  }
+
+  public CalciteParsesSimpleIdentifiersTest(String input, String expected) {
+    this.input = input;
+    this.expected = expected;
+  }
+
+  @Test
+  public void testParsesAlias() {
+    assertThat(alias(input), parsedAs(expected));
+  }
+
+  /** PCollection with a single row with a single field with the specified 
alias. */
+  private PCollection<Row> alias(String alias) {
+    return pipeline.apply(SqlTransform.query(String.format("SELECT 321 AS %s", 
alias)));
+  }
+
+  /**
+   * Asserts that the specified field alias is parsed as expected.
+   *
+   * <p>SQL parser un-escapes the qouted identifiers, for example.
+   */
+  private Matcher<PCollection<Row>> parsedAs(String expected) {
+    return new BaseMatcher<PCollection<Row>>() {
+      @Override
+      public boolean matches(Object actual) {
+        PCollection<Row> result = (PCollection<Row>) actual;
+        PAssert.thatSingleton(result).satisfies(assertFieldNameIs(expected));
+        pipeline.run();
+        return true;
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("field alias matches");
+      }
+    };
+  }
+
+  /** Assert that field name of the only field matches the expected value. */
+  private SerializableFunction<Row, Void> assertFieldNameIs(String expected) {
+    return row -> {
+      assertEquals(expected, onlyField(row).getName());
+      return null;
+    };
+  }
+
+  /** Returns the only field in the row. */
+  private Schema.Field onlyField(Row row) {
+    assertEquals(1, row.getFieldCount());
+    return row.getSchema().getField(0);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
new file mode 100644
index 0000000..484e031
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** CustomTableResolverTest. */
+public class CustomTableResolverTest implements Serializable {
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  private static final Schema BASIC_SCHEMA =
+      Schema.builder().addInt32Field("id").addStringField("name").build();
+
+  /**
+   * Test table provider with custom name resolution.
+   *
+   * <p>Demonstrates how to parse table names as in normal Calcite queries 
syntax, e.g. {@code
+   * a.b.c.d} and convert them to its' own custom table name format {@code 
a_b_c_d}.
+   */
+  public static class CustomResolutionTestTableProvider extends 
TestTableProvider
+      implements CustomTableResolver {
+
+    List<TableName> parsedTableNames = null;
+
+    @Override
+    public void registerKnownTableNames(List<TableName> tableNames) {
+      parsedTableNames = tableNames;
+    }
+
+    @Override
+    public TableProvider getSubProvider(String name) {
+      // TODO: implement with trie
+
+      // If 'name' matches a sub-schema/sub-provider we start tracking
+      // the subsequent calls to getSubProvider().
+      //
+      // Simple table ids and final table lookup
+      //
+      // If there is no matching sub-schema then returning null from here 
indicates
+      // that 'name' is either not part of this schema or it's a table, not a 
sub-schema,
+      // this will be checked right after this in a getTable() call.
+      //
+      // Because this is a getSubProvider() call it means Calcite expects
+      // the sub-schema/sub-provider to be returned, not a table,
+      // so we only need to check against known compound table identifiers.
+      // If 'name' acutally represents a simple identifier then it will be 
checked
+      // in a 'getTable()' call later. Unless there's the same sub-provider 
name,
+      // in which case it's a conflict and we will use the sub-schema and not 
assume it's a table.
+      // Calcite does the same.
+      //
+      // Here we find if there are any parsed tables that start from 'name' 
that belong to this
+      // table provider.
+      // We then create a fake tracking provider that in a trie-manner collects
+      // getSubProvider()/getTable() calls by checking whether there are known 
parsed table names
+      // matching what Calcite asks us for.
+      List<TableName> tablesToLookFor =
+          parsedTableNames.stream()
+              .filter(TableName::isCompound)
+              .filter(tableName -> tableName.getPrefix().equals(name))
+              .collect(toList());
+
+      return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, 
tablesToLookFor) : null;
+    }
+
+    class TableNameTrackingProvider extends TestTableProvider {
+      int schemaLevel;
+      List<TableName> tableNames;
+
+      TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) {
+        this.schemaLevel = schemaLevel;
+        this.tableNames = tableNames;
+      }
+
+      @Override
+      public TableProvider getSubProvider(String name) {
+        // Find if any of the parsed table names have 'name' as part
+        // of their path at current index.
+        //
+        // If there are, return a new tracking provider for such tables and 
incremented index.
+        //
+        // If there are none, it means something weird has happened and 
returning null
+        // will make Calcite try other schemas. Maybe things will work out.
+        //
+        // However since we originally register all parsed table names for the 
given schema
+        // in this provider we should only receive a getSubProvider() call for 
something unknown
+        // when it's a leaf path element, i.e. actual table name, which will 
be handled in
+        // getTable() call.
+        List<TableName> matchingTables =
+            tableNames.stream()
+                .filter(TableName::isCompound)
+                .filter(tableName -> tableName.getPath().size() > schemaLevel)
+                .filter(tableName -> 
tableName.getPath().get(schemaLevel).equals(name))
+                .collect(toList());
+
+        return matchingTables.size() > 0
+            ? new TableNameTrackingProvider(schemaLevel + 1, matchingTables)
+            : null;
+      }
+
+      @Nullable
+      @Override
+      public Table getTable(String name) {
+
+        // This is called only after getSubProvider() returned null,
+        // and since we are tracking the actual parsed table names, this should
+        // be it, there should exist a parsed table that matches the 'name'.
+
+        Optional<TableName> matchingTable =
+            tableNames.stream()
+                .filter(tableName -> tableName.getTableName().equals(name))
+                .findFirst();
+
+        TableName tableName =
+            matchingTable.orElseThrow(
+                () ->
+                    new IllegalStateException(
+                        "Unexpected table '"
+                            + name
+                            + "' requested. Current schema level is "
+                            + schemaLevel
+                            + ". Current known table names: "
+                            + tableNames.toString()));
+        // For test we register tables with underscore instead of dots, so 
here we lookup the tables
+        // with those underscore
+        String actualTableName =
+            String.join("_", tableName.getPath()) + "_" + 
tableName.getTableName();
+        return 
CustomResolutionTestTableProvider.this.getTable(actualTableName);
+      }
+
+      @Override
+      public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
+        return CustomResolutionTestTableProvider.this.buildBeamSqlTable(table);
+      }
+    }
+  }
+
+  @Test
+  public void testSimpleId() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testtable")
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSimpleIdWithExplicitDefaultSchema() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testprovider.testtable")
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSimpleIdWithExplicitDefaultSchemaWithMultipleProviders() 
throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testprovider2.testtable2")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSimpleIdWithExplicitNonDefaultSchema() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testprovider2.testtable2")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testCompoundIdInDefaultSchema() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testtable.blah")
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testCompoundIdInExplicitDefaultSchema() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM 
testprovider.testtable.blah")
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testLongCompoundIdInDefaultSchema() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar")
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testLongCompoundIdInDefaultSchemaWithMultipleProviders() throws 
Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, 
"four"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testLongCompoundIdInExplicitDefaultSchema() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM 
testprovider.testtable.blah.foo.bar")
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testLongCompoundIdInNonDefaultSchemaSameTableNames() throws 
Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, 
"four"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM 
testprovider2.testtable.blah.foo.bar")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testLongCompoundIdInNonDefaultSchemaDifferentNames() throws 
Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, 
"two"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        Table.builder()
+            .name("testtable2_blah2_foo2_bar2")
+            .schema(BASIC_SCHEMA)
+            .type("test")
+            .build());
+    tableProvider2.addRows("testtable2_blah2_foo2_bar2", row(3, "three"), 
row(4, "four"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query("SELECT id, name FROM 
testprovider2.testtable2.blah2.foo2.bar2")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testJoinWithLongCompoundIds() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query(
+                    "SELECT testprovider2.testtable.blah.foo.bar2.id, 
testtable.blah.foo.bar.name \n"
+                        + "FROM \n"
+                        + "  testprovider2.testtable.blah.foo.bar2 \n"
+                        + "JOIN \n"
+                        + "  testtable.blah.foo.bar \n"
+                        + "USING(name)")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, 
"nobody"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInnerJoinWithLongCompoundIds() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query(
+                    "SELECT testprovider2.testtable.blah.foo.bar2.id, 
testtable.blah.foo.bar.name \n"
+                        + "FROM \n"
+                        + "  testprovider2.testtable.blah.foo.bar2 \n"
+                        + "JOIN \n"
+                        + "  testtable.blah.foo.bar \n"
+                        + "USING(name)")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, 
"nobody"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testJoinWithLongCompoundIdsWithAliases() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query(
+                    "SELECT b.id, a.name \n"
+                        + "FROM \n"
+                        + "  testprovider2.testtable.blah.foo.bar2 AS b \n"
+                        + "JOIN \n"
+                        + "  testtable.blah.foo.bar a\n"
+                        + "USING(name)")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, 
"nobody"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testUnionWithLongCompoundIds() throws Exception {
+    TestTableProvider tableProvider = new CustomResolutionTestTableProvider();
+    tableProvider.createTable(
+        
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, 
"nobody"));
+
+    TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
+    tableProvider2.createTable(
+        
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
+    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), 
row(1, "nobody"));
+
+    PCollection<Row> result =
+        pipeline.apply(
+            SqlTransform.query(
+                    "SELECT id, name \n"
+                        + "FROM \n"
+                        + "  testprovider2.testtable.blah.foo.bar2 \n"
+                        + "UNION \n"
+                        + "    SELECT id, name \n"
+                        + "      FROM \n"
+                        + "        testtable.blah.foo.bar \n")
+                .withTableProvider("testprovider2", tableProvider2)
+                .withDefaultTableProvider("testprovider", tableProvider));
+
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(4, "customer"), row(1, "nobody"), row(3, "customer"), row(2, 
"nobody"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  private Row row(int id, String name) {
+    return Row.withSchema(BASIC_SCHEMA).addValues(id, name).build();
+  }
+}

Reply via email to