This is an automated email from the ASF dual-hosted git repository. mingmxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e981b43 [BEAM-3983] [SQL] Tables interface supports BigQuery (#4991) e981b43 is described below commit e981b439b6924dba20e7285091d343ae4d41765a Author: Andrew Pilloud <apill...@users.noreply.github.com> AuthorDate: Thu Apr 12 11:33:00 2018 -0700 [BEAM-3983] [SQL] Tables interface supports BigQuery (#4991) * [SQL] POutput interface instead of PDone type * [SQL] Location isn't necessarily a URI --- .../apache/beam/sdk/extensions/sql/BeamSqlTable.java | 4 ++-- .../sdk/extensions/sql/impl/parser/SqlCreateTable.java | 5 ++--- .../sql/impl/schema/BeamPCollectionTable.java | 4 ++-- .../org/apache/beam/sdk/extensions/sql/meta/Table.java | 13 ++----------- .../sql/meta/provider/kafka/BeamKafkaTable.java | 8 ++++---- .../sql/meta/provider/text/BeamTextCSVTable.java | 4 ++-- .../meta/provider/text/BeamTextCSVTableIOWriter.java | 6 +++--- .../sql/meta/provider/text/BeamTextTable.java | 3 +-- .../sql/meta/provider/text/TextTableProvider.java | 4 ++-- .../apache/beam/sdk/extensions/sql/BeamSqlCliTest.java | 8 ++++---- .../extensions/sql/impl/parser/BeamSqlParserTest.java | 17 ++++++----------- .../sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java | 4 ++-- .../sql/meta/provider/kafka/KafkaTableProviderTest.java | 3 +-- .../sql/meta/provider/text/TextTableProviderTest.java | 3 +-- .../sql/meta/store/InMemoryMetaStoreTest.java | 3 +-- .../sdk/extensions/sql/mock/MockedBoundedTable.java | 5 +++-- .../beam/sdk/extensions/sql/mock/MockedTable.java | 4 ++-- 17 files changed, 40 insertions(+), 58 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java index 5d484c9..6598ee3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; /** @@ -46,7 +46,7 @@ public interface BeamSqlTable { * create a {@code IO.write()} instance to write to target. * */ - PTransform<? super PCollection<Row>, PDone> buildIOWriter(); + PTransform<? super PCollection<Row>, POutput> buildIOWriter(); /** * Get the schema info of the table. diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java index 794e3e6..15e8b96 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.impl.parser; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; -import java.net.URI; import java.util.List; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; @@ -109,8 +108,8 @@ public class SqlCreateTable extends SqlCall { return tblName.toString(); } - public URI location() { - return location == null ? null : URI.create(getString(location)); + public String location() { + return location == null ? null : getString(location); } public String type() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java index 5a24f47..0b10b4c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; /** @@ -56,7 +56,7 @@ public class BeamPCollectionTable extends BaseBeamTable { } @Override - public PTransform<? super PCollection<Row>, PDone> buildIOWriter() { + public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java index 4af82a0..a3339af 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java @@ -21,7 +21,6 @@ package org.apache.beam.sdk.extensions.sql.meta; import com.alibaba.fastjson.JSONObject; import com.google.auto.value.AutoValue; import java.io.Serializable; -import java.net.URI; import java.util.List; import javax.annotation.Nullable; @@ -37,7 +36,7 @@ public abstract class Table implements Serializable { @Nullable public abstract String getComment(); @Nullable - public abstract URI getLocation(); + public abstract String getLocation(); @Nullable public abstract JSONObject getProperties(); @@ -45,14 +44,6 @@ public abstract class Table implements Serializable { return new org.apache.beam.sdk.extensions.sql.meta.AutoValue_Table.Builder(); } - public String getLocationAsString() { - if (getLocation() == null) { - return null; - } - - return "/" + getLocation().getHost() + getLocation().getPath(); - } - /** * Builder class for {@link Table}. */ @@ -62,7 +53,7 @@ public abstract class Table implements Serializable { public abstract Builder name(String name); public abstract Builder columns(List<Column> columns); public abstract Builder comment(String name); - public abstract Builder location(URI location); + public abstract Builder location(String location); public abstract Builder properties(JSONObject properties); public abstract Table build(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index 4d31e4c..129e15e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; import static com.google.common.base.Preconditions.checkArgument; -import java.io.Serializable; import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -33,6 +32,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -43,7 +43,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}. * */ -public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { +public abstract class BeamKafkaTable extends BaseBeamTable { private String bootstrapServers; private List<String> topics; private List<TopicPartition> topicPartitions; @@ -109,11 +109,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab } @Override - public PTransform<? super PCollection<Row>, PDone> buildIOWriter() { + public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { checkArgument(topics != null && topics.size() == 1, "Only one topic can be acceptable as output."); - return new PTransform<PCollection<Row>, PDone>() { + return new PTransform<PCollection<Row>, POutput>() { @Override public PDone expand(PCollection<Row> input) { return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java index e407a4d..d0b5900 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.commons.csv.CSVFormat; @@ -62,7 +62,7 @@ public class BeamTextCSVTable extends BeamTextTable { } @Override - public PTransform<? super PCollection<Row>, PDone> buildIOWriter() { + public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { return new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java index d32c9dfe..aea826d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java @@ -27,14 +27,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.commons.csv.CSVFormat; /** * IOWriter for {@code BeamTextCSVTable}. */ -public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>, PDone> +public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>, POutput> implements Serializable { private String filePattern; protected Schema schema; @@ -49,7 +49,7 @@ public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>, PDone } @Override - public PDone expand(PCollection<Row> input) { + public POutput expand(PCollection<Row> input) { return input.apply("encodeRecord", ParDo.of(new DoFn<Row, String>() { @ProcessElement diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java index 841f4e2..0a5e9ee 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.text; -import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType; import org.apache.beam.sdk.schemas.Schema; @@ -26,7 +25,7 @@ import org.apache.beam.sdk.schemas.Schema; /** * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). */ -public abstract class BeamTextTable extends BaseBeamTable implements Serializable { +public abstract class BeamTextTable extends BaseBeamTable { protected String filePattern; protected BeamTextTable(Schema schema, String filePattern) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java index 6a612c4..102069a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -40,7 +40,7 @@ import org.apache.commons.csv.CSVFormat; * ) * TYPE 'text' * COMMENT 'this is the table orders' - * LOCATION 'text://home/admin/orders' + * LOCATION '/home/admin/orders' * TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format) * }</pre> */ @@ -53,7 +53,7 @@ public class TextTableProvider implements TableProvider { @Override public BeamSqlTable buildBeamSqlTable(Table table) { Schema schema = getRowTypeFromTable(table); - String filePattern = table.getLocationAsString(); + String filePattern = table.getLocation(); CSVFormat format = CSVFormat.DEFAULT; JSONObject properties = table.getProperties(); String csvFormatStr = properties.getString("format"); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java index 9bf724d..93b29d9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java @@ -44,7 +44,7 @@ public class BeamSqlCliTest { + "name varchar(31) COMMENT 'name', \n" + "age int COMMENT 'age') \n" + "TYPE 'text' \n" - + "COMMENT '' LOCATION 'text://home/admin/orders'" + + "COMMENT '' LOCATION '/home/admin/orders'" ); Table table = metaStore.getTable("person"); assertNotNull(table); @@ -63,7 +63,7 @@ public class BeamSqlCliTest { + "name varchar(31) COMMENT 'name', \n" + "age int COMMENT 'age') \n" + "TYPE 'text' \n" - + "COMMENT '' LOCATION 'text://home/admin/orders'" + + "COMMENT '' LOCATION '/home/admin/orders'" ); Table table = metaStore.getTable("person"); assertNotNull(table); @@ -86,7 +86,7 @@ public class BeamSqlCliTest { + "name varchar(31) COMMENT 'name', \n" + "age int COMMENT 'age') \n" + "TYPE 'text' \n" - + "COMMENT '' LOCATION 'text://home/admin/orders'" + + "COMMENT '' LOCATION '/home/admin/orders'" ); cli.execute("drop table person"); cli.explainQuery("select * from person"); @@ -107,7 +107,7 @@ public class BeamSqlCliTest { + "name varchar(31) COMMENT 'name', \n" + "age int COMMENT 'age') \n" + "TYPE 'text' \n" - + "COMMENT '' LOCATION 'text://home/admin/orders'" + + "COMMENT '' LOCATION '/home/admin/orders'" ); String plan = cli.explainQuery("select * from person"); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java index 14fb484..6d5ac1a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; -import java.net.URI; import org.apache.beam.sdk.extensions.sql.RowSqlTypes; import org.apache.beam.sdk.extensions.sql.meta.Column; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -50,7 +49,7 @@ public class BeamSqlParserTest { + "name varchar(31) COMMENT 'name') \n" + "TYPE 'text' \n" + "COMMENT 'person table' \n" - + "LOCATION 'text://home/admin/person'\n" + + "LOCATION '/home/admin/person'\n" + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'" ); assertEquals( @@ -66,7 +65,7 @@ public class BeamSqlParserTest { + "id int COMMENT 'id', \n" + "name varchar(31) COMMENT 'name') \n" + "COMMENT 'person table' \n" - + "LOCATION 'text://home/admin/person'\n" + + "LOCATION '/home/admin/person'\n" + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'" ); } @@ -84,7 +83,7 @@ public class BeamSqlParserTest { + "id int COMMENT 'id', \n" + "name varchar(31) COMMENT 'name') \n" + "TYPE 'text' \n" - + "LOCATION 'text://home/admin/person'\n" + + "LOCATION '/home/admin/person'\n" + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'" ); assertEquals(mockTable("person", "text", null, properties), table); @@ -98,7 +97,7 @@ public class BeamSqlParserTest { + "name varchar(31) COMMENT 'name') \n" + "TYPE 'text' \n" + "COMMENT 'person table' \n" - + "LOCATION 'text://home/admin/person'\n" + + "LOCATION '/home/admin/person'\n" ); assertEquals( mockTable("person", "text", "person table", new JSONObject()), @@ -145,21 +144,17 @@ public class BeamSqlParserTest { } private static Table mockTable(String name, String type, String comment, JSONObject properties) { - return mockTable(name, type, comment, properties, "text://home/admin/" + name); + return mockTable(name, type, comment, properties, "/home/admin/" + name); } private static Table mockTable(String name, String type, String comment, JSONObject properties, String location) { - URI locationURI = null; - if (location != null) { - locationURI = URI.create(location); - } return Table.builder() .name(name) .type(type) .comment(comment) - .location(locationURI) + .location(location) .columns(ImmutableList.of( Column.builder() .name("id") diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index f4253dd..47d3b06 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -123,7 +123,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest { } @Override - public PTransform<? super PCollection<Row>, PDone> buildIOWriter() { + public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { throw new UnsupportedOperationException(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java index ffe8dcc..58b048e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; -import java.net.URI; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.RowSqlTypes; import org.apache.beam.sdk.extensions.sql.meta.Column; @@ -65,7 +64,7 @@ public class KafkaTableProviderTest { return Table.builder() .name(name) .comment(name + " table") - .location(URI.create("kafka://localhost:2181/brokers?topic=test")) + .location("kafka://localhost:2181/brokers?topic=test") .columns(ImmutableList.of( Column.builder() .name("id") diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java index 11ecaf4..7b47e1a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; -import java.net.URI; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.RowSqlTypes; import org.apache.beam.sdk.extensions.sql.meta.Column; @@ -76,7 +75,7 @@ public class TextTableProviderTest { return Table.builder() .name(name) .comment(name + " table") - .location(URI.create("text://home/admin/" + name)) + .location("/home/admin/" + name) .columns(ImmutableList.of( Column.builder() .name("id") diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java index a3e391e..2ee027a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; -import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; @@ -129,7 +128,7 @@ public class InMemoryMetaStoreTest { return Table.builder() .name(name) .comment(name + " table") - .location(URI.create("text://home/admin/" + name)) + .location("/home/admin/" + name) .columns(ImmutableList.of( Column.builder() .name("id") diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java index 249daee..24d22e1 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; /** @@ -104,7 +105,7 @@ public class MockedBoundedTable extends MockedTable { "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); } - @Override public PTransform<? super PCollection<Row>, PDone> buildIOWriter() { + @Override public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { return new OutputStore(); } @@ -112,7 +113,7 @@ public class MockedBoundedTable extends MockedTable { * Keep output in {@code CONTENT} for validation. * */ - public static class OutputStore extends PTransform<PCollection<Row>, PDone> { + public static class OutputStore extends PTransform<PCollection<Row>, POutput> { @Override public PDone expand(PCollection<Row> input) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java index 0ed77ba..cd52de0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; /** @@ -36,7 +36,7 @@ public abstract class MockedTable extends BaseBeamTable { } @Override - public PTransform<? super PCollection<Row>, PDone> buildIOWriter() { + public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { throw new UnsupportedOperationException("buildIOWriter unsupported!"); } } -- To stop receiving notification emails like this one, please contact ming...@apache.org.