This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 989c928 [SQL] Support complex identifiers in DataCatalog new fd67fd3 Merge pull request #9353 from akedin/datacatalog-custom-name-resolution 989c928 is described below commit 989c928f87b1e195d06862414b3ed6545e60ab2f Author: akedin <ke...@google.com> AuthorDate: Thu Aug 15 12:09:39 2019 -0700 [SQL] Support complex identifiers in DataCatalog --- build.gradle | 1 + sdks/java/extensions/sql/datacatalog/build.gradle | 36 +++- .../datacatalog/DataCatalogTableProvider.java | 24 ++- .../meta/provider/datacatalog/ZetaSqlIdUtils.java | 79 ++++++++ .../datacatalog/DataCatalogBigQueryIT.java | 98 ++++++++++ .../provider/datacatalog/ZetaSqlIdUtilsTest.java | 64 +++++++ .../extensions/sql/impl/TableResolutionUtils.java | 4 +- .../sql/meta/provider/FullNameTableProvider.java | 166 +++++++++++++++++ .../sql/meta/CustomTableResolverTest.java | 206 +++++++-------------- .../beam/sdk/io/gcp/bigquery/TestBigQuery.java | 4 + 10 files changed, 538 insertions(+), 144 deletions(-) diff --git a/build.gradle b/build.gradle index d7e7c06..fd0c74a 100644 --- a/build.gradle +++ b/build.gradle @@ -158,6 +158,7 @@ task javaPostCommit() { task sqlPostCommit() { dependsOn ":sdks:java:extensions:sql:postCommit" dependsOn ":sdks:java:extensions:sql:jdbc:postCommit" + dependsOn ":sdks:java:extensions:sql:datacatalog:postCommit" } task javaPostCommitPortabilityApi () { diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle b/sdks/java/extensions/sql/datacatalog/build.gradle index b9bff77..530c2ec 100644 --- a/sdks/java/extensions/sql/datacatalog/build.gradle +++ b/sdks/java/extensions/sql/datacatalog/build.gradle @@ -1,5 +1,3 @@ -import groovy.json.JsonOutput - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,6 +16,8 @@ import groovy.json.JsonOutput * limitations under the License. */ +import groovy.json.JsonOutput + plugins { id 'org.apache.beam.module' } applyJavaNature() @@ -59,3 +59,35 @@ task runDataCatalogExample(type: JavaExec) { "--tempLocation=${gcsTempRoot}", ] } + + +task integrationTest(type: Test) { + group = "Verification" + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' + + // Disable Gradle cache (it should not be used because the IT's won't run). + outputs.upToDateWhen { false } + + def pipelineOptions = [ + "--project=${gcpProject}", + "--tempLocation=${gcsTempRoot}", + "--blockOnRun=false"] + + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) + + include '**/*IT.class' + maxParallelForks 4 + classpath = project(":sdks:java:extensions:sql:datacatalog") + .sourceSets + .test + .runtimeClasspath + testClassesDirs = files(project(":sdks:java:extensions:sql:datacatalog").sourceSets.test.output.classesDirs) + useJUnit {} +} + +task postCommit { + group = "Verification" + description = "Various integration tests" + dependsOn integrationTest +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java index 1ca0959..3fa8594 100644 --- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java @@ -27,16 +27,19 @@ import java.util.Map; import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.TableName; import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** Uses DataCatalog to get the source type and schema for a table. */ -public class DataCatalogTableProvider implements TableProvider { +public class DataCatalogTableProvider extends FullNameTableProvider { private Map<String, TableProvider> delegateProviders; private Map<String, Table> tableCache; @@ -92,8 +95,23 @@ public class DataCatalogTableProvider implements TableProvider { } @Override - public @Nullable Table getTable(String tableName) { - return loadTable(tableName); + public @Nullable Table getTable(String tableNamePart) { + throw new UnsupportedOperationException( + "Loading a table by partial name '" + tableNamePart + "' is unsupported"); + } + + @Override + public @Nullable Table getTableByFullName(TableName fullTableName) { + + ImmutableList<String> allNameParts = + ImmutableList.<String>builder() + .addAll(fullTableName.getPath()) + .add(fullTableName.getTableName()) + .build(); + + String fullEscapedTableName = ZetaSqlIdUtils.escapeAndJoin(allNameParts); + + return loadTable(fullEscapedTableName); } private @Nullable Table loadTable(String tableName) { diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtils.java new file mode 100644 index 0000000..eac82c4 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import static java.util.stream.Collectors.joining; + +import java.util.List; +import java.util.regex.Pattern; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Utils to work with ZetaSQL-compatible IDs. */ +class ZetaSqlIdUtils { + + /** + * Some special characters we explicitly handle. + * + * <p>Everything else is ignored, e.g. tabs, newlines, etc. + */ + private static final Pattern SPECIAL_CHARS_ESCAPE = + Pattern.compile( + "(?<SpecialChar>[" + + "\\\\" // slash + + "`" // backtick + + "'" // single quote + + "\"" // double quote + + "?" // question mark + + "])"); + + private static final ImmutableMap<String, String> WHITESPACES = + ImmutableMap.of( + "\n", "\\\\n", + "\t", "\\\\t", + "\r", "\\\\r", + "\f", "\\\\f"); + + private static final Pattern SIMPLE_ID = Pattern.compile("[A-Za-z_][A-Za-z_0-9]*"); + + /** + * Joins parts into a single compound ZetaSQL identifier. + * + * <p>Escapes backticks, slashes, double and single quotes, doesn't handle other special + * characters for now. + */ + static String escapeAndJoin(List<String> parts) { + return parts.stream() + .map(ZetaSqlIdUtils::escapeSpecialChars) + .map(ZetaSqlIdUtils::replaceWhitespaces) + .map(ZetaSqlIdUtils::backtickIfNeeded) + .collect(joining(".")); + } + + private static String escapeSpecialChars(String str) { + return SPECIAL_CHARS_ESCAPE.matcher(str).replaceAll("\\\\${SpecialChar}"); + } + + private static String replaceWhitespaces(String s) { + return WHITESPACES.keySet().stream() + .reduce(s, (str, whitespace) -> str.replaceAll(whitespace, WHITESPACES.get(whitespace))); + } + + private static String backtickIfNeeded(String s) { + return SIMPLE_ID.matcher(s).matches() ? s : ("`" + s + "`"); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java new file mode 100644 index 0000000..4c0d6e9 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; +import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for DataCatalog+BigQuery. */ +@RunWith(JUnit4.class) +public class DataCatalogBigQueryIT { + + private static final Schema ID_NAME_SCHEMA = + Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build(); + + @Rule public transient TestPipeline writeToBQPipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(ID_NAME_SCHEMA); + + @Test + public void testReadWrite() throws Exception { + createBQTableWith( + new TableRow().set("id", 1).set("name", "name1"), + new TableRow().set("id", 2).set("name", "name2"), + new TableRow().set("id", 3).set("name", "name3")); + + TableReference bqTable = bigQuery.tableReference(); + String tableId = + String.format( + "bigquery.`table`.`%s`.`%s`.`%s`", + bqTable.getProjectId(), bqTable.getDatasetId(), bqTable.getTableId()); + + PCollection<Row> result = + readPipeline.apply( + "query", + SqlTransform.query("SELECT id, name FROM " + tableId) + .withDefaultTableProvider( + "datacatalog", DataCatalogTableProvider.create(readPipeline.getOptions()))); + + PAssert.that(result).containsInAnyOrder(row(1, "name1"), row(2, "name2"), row(3, "name3")); + readPipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + private Row row(long id, String name) { + return Row.withSchema(ID_NAME_SCHEMA).addValues(id, name).build(); + } + + private void createBQTableWith(TableRow r1, TableRow r2, TableRow r3) { + writeToBQPipeline + .apply(Create.of(r1, r2, r3).withCoder(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to(bigQuery.tableSpec()) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("id").setType("INTEGER"), + new TableFieldSchema().setName("name").setType("STRING")))) + .withoutValidation()); + writeToBQPipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtilsTest.java b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtilsTest.java new file mode 100644 index 0000000..ff34ef8 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/ZetaSqlIdUtilsTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +/** Unit tests for {@link ZetaSqlIdUtils}. */ +public class ZetaSqlIdUtilsTest { + + @Test + public void testHandlesSimpleIds() { + List<String> id = Arrays.asList("aaa", "BbB", "zAzzz00"); + assertEquals("aaa.BbB.zAzzz00", ZetaSqlIdUtils.escapeAndJoin(id)); + } + + @Test + public void testHandlesMixedIds() { + List<String> id = Arrays.asList("aaa", "Bb---B", "zAzzz00"); + assertEquals("aaa.`Bb---B`.zAzzz00", ZetaSqlIdUtils.escapeAndJoin(id)); + } + + @Test + public void testHandlesSpecialChars() { + List<String> id = Arrays.asList("a\\a", "b`b", "c'c", "d\"d", "e?e"); + assertEquals("`a\\\\a`.`b\\`b`.`c\\'c`.`d\\\"d`.`e\\?e`", ZetaSqlIdUtils.escapeAndJoin(id)); + } + + @Test + public void testHandlesSpecialCharsInOnePart() { + List<String> id = Arrays.asList("a\\ab`bc'cd\"de?e"); + assertEquals("`a\\\\ab\\`bc\\'cd\\\"de\\?e`", ZetaSqlIdUtils.escapeAndJoin(id)); + } + + @Test + public void testHandlesWhiteSpaces() { + List<String> id = Arrays.asList("a\na", "b\tb", "c\rc", "d\fd"); + assertEquals("`a\\na`.`b\\tb`.`c\\rc`.`d\\fd`", ZetaSqlIdUtils.escapeAndJoin(id)); + } + + @Test + public void testHandlesWhiteSpacesInOnePart() { + List<String> id = Arrays.asList("a\nab\tbc\rcd\fd"); + assertEquals("`a\\nab\\tbc\\rcd\\fd`", ZetaSqlIdUtils.escapeAndJoin(id)); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java index af6146b..247f1f7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java @@ -34,8 +34,8 @@ import org.apache.calcite.sql.SqlNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** TableResolutionUtils. */ -public class TableResolutionUtils { +/** Utils to wire up the custom table resolution into Calcite's planner. */ +class TableResolutionUtils { private static final Logger LOG = LoggerFactory.getLogger(TableResolutionUtils.class); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java new file mode 100644 index 0000000..066e779 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import static java.util.stream.Collectors.toList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.TableName; +import org.apache.beam.sdk.extensions.sql.meta.CustomTableResolver; +import org.apache.beam.sdk.extensions.sql.meta.Table; + +/** + * Base class for table providers that look up table metadata using full table names, instead of + * querying it by parts of the name separately. + */ +@Experimental +public abstract class FullNameTableProvider implements TableProvider, CustomTableResolver { + + private List<TableName> knownTables; + + protected FullNameTableProvider() { + knownTables = new ArrayList<>(); + } + + public abstract Table getTableByFullName(TableName fullTableName); + + @Override + public void registerKnownTableNames(List<TableName> tableNames) { + knownTables.addAll(tableNames); + } + + @Override + public TableProvider getSubProvider(String name) { + // TODO: implement with trie + + // If 'name' matches a sub-schema/sub-provider we start tracking + // the subsequent calls to getSubProvider(). + // + // Simple table ids and final table lookup + // + // If there is no matching sub-schema then returning null from here indicates + // that 'name' is either not part of this schema or it's a table, not a sub-schema, + // this will be checked right after this in a getTable() call. + // + // Because this is a getSubProvider() call it means Calcite expects + // the sub-schema/sub-provider to be returned, not a table, + // so we only need to check against known compound table identifiers. + // If 'name' acutally represents a simple identifier then it will be checked + // in a 'getTable()' call later. Unless there's the same sub-provider name, + // in which case it's a conflict and we will use the sub-schema and not assume it's a table. + // Calcite does the same. + // + // Here we find if there are any parsed tables that start from 'name' that belong to this + // table provider. + // We then create a fake tracking provider that in a trie-manner collects + // getSubProvider()/getTable() calls by checking whether there are known parsed table names + // matching what Calcite asks us for. + List<TableName> tablesToLookFor = + knownTables.stream() + .filter(TableName::isCompound) + .filter(tableName -> tableName.getPrefix().equals(name)) + .collect(toList()); + + return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, tablesToLookFor) : null; + } + + /** + * Calcite calls getSubProvider()/getTable() on this class when resolving a table name. This class + * keeps track of these calls and checks against known table names (extracted from a query), so + * that when a full table name is parsed out it calls the actual table provider to get a table + * based on the full name, instead of calling it component by component. + * + * <p>This class nables table providers to query their metadata source using full table names. + */ + class TableNameTrackingProvider extends InMemoryMetaTableProvider { + int schemaLevel; + List<TableName> tableNames; + + TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) { + this.schemaLevel = schemaLevel; + this.tableNames = tableNames; + } + + @Override + public TableProvider getSubProvider(String name) { + // Find if any of the parsed table names have 'name' as part + // of their path at current index. + // + // If there are, return a new tracking provider for such tables and incremented index. + // + // If there are none, it means something weird has happened and returning null + // will make Calcite try other schemas. Maybe things will work out. + // + // However since we originally register all parsed table names for the given schema + // in this provider we should only receive a getSubProvider() call for something unknown + // when it's a leaf path element, i.e. actual table name, which will be handled in + // getTable() call. + List<TableName> matchingTables = + tableNames.stream() + .filter(TableName::isCompound) + .filter(tableName -> tableName.getPath().size() > schemaLevel) + .filter(tableName -> tableName.getPath().get(schemaLevel).equals(name)) + .collect(toList()); + + return matchingTables.size() > 0 + ? new TableNameTrackingProvider(schemaLevel + 1, matchingTables) + : null; + } + + @Override + public String getTableType() { + return "google.cloud.datacatalog.subprovider"; + } + + @Nullable + @Override + public Table getTable(String name) { + + // This is called only after getSubProvider() returned null, + // and since we are tracking the actual parsed table names, this should + // be it, there should exist a parsed table that matches the 'name'. + + Optional<TableName> matchingTable = + tableNames.stream() + .filter(tableName -> tableName.getTableName().equals(name)) + .findFirst(); + + TableName fullTableName = + matchingTable.orElseThrow( + () -> + new IllegalStateException( + "Unexpected table '" + + name + + "' requested. Current schema level is " + + schemaLevel + + ". Current known table names: " + + tableNames.toString())); + return FullNameTableProvider.this.getTableByFullName(fullTableName); + } + + @Override + public synchronized BeamSqlTable buildBeamSqlTable(Table table) { + return FullNameTableProvider.this.buildBeamSqlTable(table); + } + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java index 484e031..bd70391 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java @@ -17,16 +17,12 @@ */ package org.apache.beam.sdk.extensions.sql.meta; -import static java.util.stream.Collectors.toList; - import java.io.Serializable; -import java.util.List; -import java.util.Optional; -import javax.annotation.Nullable; +import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.extensions.sql.impl.TableName; -import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; @@ -37,7 +33,7 @@ import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; -/** CustomTableResolverTest. */ +/** Test for custom table resolver and full name table provider. */ public class CustomTableResolverTest implements Serializable { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -51,126 +47,62 @@ public class CustomTableResolverTest implements Serializable { * <p>Demonstrates how to parse table names as in normal Calcite queries syntax, e.g. {@code * a.b.c.d} and convert them to its' own custom table name format {@code a_b_c_d}. */ - public static class CustomResolutionTestTableProvider extends TestTableProvider - implements CustomTableResolver { + public static class CustomResolutionTestTableProvider extends FullNameTableProvider { + + TestTableProvider delegateTableProvider; + + public CustomResolutionTestTableProvider() { + delegateTableProvider = new TestTableProvider(); + } + + @Override + public Table getTable(String tableName) { + return delegateTableProvider.getTable(tableName); + } + + @Override + public Table getTableByFullName(TableName fullTableName) { + // For the test we register tables with underscore instead of dots, so here we lookup the + // tables + // with those underscore. + String actualTableName = + String.join("_", fullTableName.getPath()) + "_" + fullTableName.getTableName(); + return delegateTableProvider.getTable(actualTableName); + } - List<TableName> parsedTableNames = null; + @Override + public String getTableType() { + return delegateTableProvider.getTableType(); + } @Override - public void registerKnownTableNames(List<TableName> tableNames) { - parsedTableNames = tableNames; + public void createTable(Table table) { + delegateTableProvider.createTable(table); + } + + public void addRows(String tableName, Row... rows) { + delegateTableProvider.addRows(tableName, rows); } @Override - public TableProvider getSubProvider(String name) { - // TODO: implement with trie - - // If 'name' matches a sub-schema/sub-provider we start tracking - // the subsequent calls to getSubProvider(). - // - // Simple table ids and final table lookup - // - // If there is no matching sub-schema then returning null from here indicates - // that 'name' is either not part of this schema or it's a table, not a sub-schema, - // this will be checked right after this in a getTable() call. - // - // Because this is a getSubProvider() call it means Calcite expects - // the sub-schema/sub-provider to be returned, not a table, - // so we only need to check against known compound table identifiers. - // If 'name' acutally represents a simple identifier then it will be checked - // in a 'getTable()' call later. Unless there's the same sub-provider name, - // in which case it's a conflict and we will use the sub-schema and not assume it's a table. - // Calcite does the same. - // - // Here we find if there are any parsed tables that start from 'name' that belong to this - // table provider. - // We then create a fake tracking provider that in a trie-manner collects - // getSubProvider()/getTable() calls by checking whether there are known parsed table names - // matching what Calcite asks us for. - List<TableName> tablesToLookFor = - parsedTableNames.stream() - .filter(TableName::isCompound) - .filter(tableName -> tableName.getPrefix().equals(name)) - .collect(toList()); - - return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, tablesToLookFor) : null; + public void dropTable(String tableName) { + delegateTableProvider.dropTable(tableName); } - class TableNameTrackingProvider extends TestTableProvider { - int schemaLevel; - List<TableName> tableNames; - - TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) { - this.schemaLevel = schemaLevel; - this.tableNames = tableNames; - } - - @Override - public TableProvider getSubProvider(String name) { - // Find if any of the parsed table names have 'name' as part - // of their path at current index. - // - // If there are, return a new tracking provider for such tables and incremented index. - // - // If there are none, it means something weird has happened and returning null - // will make Calcite try other schemas. Maybe things will work out. - // - // However since we originally register all parsed table names for the given schema - // in this provider we should only receive a getSubProvider() call for something unknown - // when it's a leaf path element, i.e. actual table name, which will be handled in - // getTable() call. - List<TableName> matchingTables = - tableNames.stream() - .filter(TableName::isCompound) - .filter(tableName -> tableName.getPath().size() > schemaLevel) - .filter(tableName -> tableName.getPath().get(schemaLevel).equals(name)) - .collect(toList()); - - return matchingTables.size() > 0 - ? new TableNameTrackingProvider(schemaLevel + 1, matchingTables) - : null; - } - - @Nullable - @Override - public Table getTable(String name) { - - // This is called only after getSubProvider() returned null, - // and since we are tracking the actual parsed table names, this should - // be it, there should exist a parsed table that matches the 'name'. - - Optional<TableName> matchingTable = - tableNames.stream() - .filter(tableName -> tableName.getTableName().equals(name)) - .findFirst(); - - TableName tableName = - matchingTable.orElseThrow( - () -> - new IllegalStateException( - "Unexpected table '" - + name - + "' requested. Current schema level is " - + schemaLevel - + ". Current known table names: " - + tableNames.toString())); - // For test we register tables with underscore instead of dots, so here we lookup the tables - // with those underscore - String actualTableName = - String.join("_", tableName.getPath()) + "_" + tableName.getTableName(); - return CustomResolutionTestTableProvider.this.getTable(actualTableName); - } - - @Override - public synchronized BeamSqlTable buildBeamSqlTable(Table table) { - return CustomResolutionTestTableProvider.this.buildBeamSqlTable(table); - } + @Override + public Map<String, Table> getTables() { + return delegateTableProvider.getTables(); + } + + @Override + public BeamSqlTable buildBeamSqlTable(Table table) { + return delegateTableProvider.buildBeamSqlTable(table); } } @Test public void testSimpleId() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); @@ -187,7 +119,7 @@ public class CustomTableResolverTest implements Serializable { @Test public void testSimpleIdWithExplicitDefaultSchema() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); @@ -204,12 +136,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testSimpleIdWithExplicitDefaultSchemaWithMultipleProviders() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four")); @@ -227,12 +159,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testSimpleIdWithExplicitNonDefaultSchema() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four")); @@ -250,7 +182,7 @@ public class CustomTableResolverTest implements Serializable { @Test public void testCompoundIdInDefaultSchema() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two")); @@ -267,7 +199,7 @@ public class CustomTableResolverTest implements Serializable { @Test public void testCompoundIdInExplicitDefaultSchema() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two")); @@ -284,7 +216,7 @@ public class CustomTableResolverTest implements Serializable { @Test public void testLongCompoundIdInDefaultSchema() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); @@ -301,12 +233,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testLongCompoundIdInDefaultSchemaWithMultipleProviders() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four")); @@ -324,7 +256,7 @@ public class CustomTableResolverTest implements Serializable { @Test public void testLongCompoundIdInExplicitDefaultSchema() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); @@ -341,12 +273,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testLongCompoundIdInNonDefaultSchemaSameTableNames() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four")); @@ -364,12 +296,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testLongCompoundIdInNonDefaultSchemaDifferentNames() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder() .name("testtable2_blah2_foo2_bar2") @@ -391,12 +323,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testJoinWithLongCompoundIds() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); @@ -420,12 +352,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testInnerJoinWithLongCompoundIds() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); @@ -449,12 +381,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testJoinWithLongCompoundIdsWithAliases() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); @@ -478,12 +410,12 @@ public class CustomTableResolverTest implements Serializable { @Test public void testUnionWithLongCompoundIds() throws Exception { - TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider(); tableProvider.createTable( Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); - TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); tableProvider2.createTable( Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java index f362fcd..4b4a97e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java @@ -189,6 +189,10 @@ public class TestBigQuery implements TestRule { table.getTableReference().getTableId()); } + public TableReference tableReference() { + return table.getTableReference(); + } + /** * Loads rows from BigQuery into {@link Row Rows} with given {@link Schema}. *