This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e981b43  [BEAM-3983] [SQL] Tables interface supports BigQuery (#4991)
e981b43 is described below

commit e981b439b6924dba20e7285091d343ae4d41765a
Author: Andrew Pilloud <apill...@users.noreply.github.com>
AuthorDate: Thu Apr 12 11:33:00 2018 -0700

    [BEAM-3983] [SQL] Tables interface supports BigQuery (#4991)
    
    * [SQL] POutput interface instead of PDone type
    
    * [SQL] Location isn't necessarily a URI
---
 .../apache/beam/sdk/extensions/sql/BeamSqlTable.java    |  4 ++--
 .../sdk/extensions/sql/impl/parser/SqlCreateTable.java  |  5 ++---
 .../sql/impl/schema/BeamPCollectionTable.java           |  4 ++--
 .../org/apache/beam/sdk/extensions/sql/meta/Table.java  | 13 ++-----------
 .../sql/meta/provider/kafka/BeamKafkaTable.java         |  8 ++++----
 .../sql/meta/provider/text/BeamTextCSVTable.java        |  4 ++--
 .../meta/provider/text/BeamTextCSVTableIOWriter.java    |  6 +++---
 .../sql/meta/provider/text/BeamTextTable.java           |  3 +--
 .../sql/meta/provider/text/TextTableProvider.java       |  4 ++--
 .../apache/beam/sdk/extensions/sql/BeamSqlCliTest.java  |  8 ++++----
 .../extensions/sql/impl/parser/BeamSqlParserTest.java   | 17 ++++++-----------
 .../sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java |  4 ++--
 .../sql/meta/provider/kafka/KafkaTableProviderTest.java |  3 +--
 .../sql/meta/provider/text/TextTableProviderTest.java   |  3 +--
 .../sql/meta/store/InMemoryMetaStoreTest.java           |  3 +--
 .../sdk/extensions/sql/mock/MockedBoundedTable.java     |  5 +++--
 .../beam/sdk/extensions/sql/mock/MockedTable.java       |  4 ++--
 17 files changed, 40 insertions(+), 58 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 5d484c9..6598ee3 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -23,7 +23,7 @@ import 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -46,7 +46,7 @@ public interface BeamSqlTable {
    * create a {@code IO.write()} instance to write to target.
    *
    */
-   PTransform<? super PCollection<Row>, PDone> buildIOWriter();
+   PTransform<? super PCollection<Row>, POutput> buildIOWriter();
 
   /**
    * Get the schema info of the table.
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index 794e3e6..15e8b96 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.impl.parser;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.base.Strings;
-import java.net.URI;
 import java.util.List;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -109,8 +108,8 @@ public class SqlCreateTable extends SqlCall {
     return tblName.toString();
   }
 
-  public URI location() {
-    return location == null ? null : URI.create(getString(location));
+  public String location() {
+    return location == null ? null : getString(location);
   }
 
   public String type() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
index 5a24f47..0b10b4c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -56,7 +56,7 @@ public class BeamPCollectionTable extends BaseBeamTable {
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
     throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as 
target");
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
index 4af82a0..a3339af 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.extensions.sql.meta;
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import java.net.URI;
 import java.util.List;
 import javax.annotation.Nullable;
 
@@ -37,7 +36,7 @@ public abstract class Table implements Serializable {
   @Nullable
   public abstract String getComment();
   @Nullable
-  public abstract URI getLocation();
+  public abstract String getLocation();
   @Nullable
   public abstract JSONObject getProperties();
 
@@ -45,14 +44,6 @@ public abstract class Table implements Serializable {
     return new 
org.apache.beam.sdk.extensions.sql.meta.AutoValue_Table.Builder();
   }
 
-  public String getLocationAsString() {
-    if (getLocation() == null) {
-      return null;
-    }
-
-    return "/" + getLocation().getHost() + getLocation().getPath();
-  }
-
   /**
    * Builder class for {@link Table}.
    */
@@ -62,7 +53,7 @@ public abstract class Table implements Serializable {
     public abstract Builder name(String name);
     public abstract Builder columns(List<Column> columns);
     public abstract Builder comment(String name);
-    public abstract Builder location(URI location);
+    public abstract Builder location(String location);
     public abstract Builder properties(JSONObject properties);
     public abstract Table build();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 4d31e4c..129e15e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -19,7 +19,6 @@ package 
org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
@@ -33,6 +32,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -43,7 +43,7 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
  * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
  *
  */
-public abstract class BeamKafkaTable extends BaseBeamTable implements 
Serializable {
+public abstract class BeamKafkaTable extends BaseBeamTable {
   private String bootstrapServers;
   private List<String> topics;
   private List<TopicPartition> topicPartitions;
@@ -109,11 +109,11 @@ public abstract class BeamKafkaTable extends 
BaseBeamTable implements Serializab
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
     checkArgument(topics != null && topics.size() == 1,
         "Only one topic can be acceptable as output.");
 
-    return new PTransform<PCollection<Row>, PDone>() {
+    return new PTransform<PCollection<Row>, POutput>() {
       @Override
       public PDone expand(PCollection<Row> input) {
         return input.apply("out_reformat", 
getPTransformForOutput()).apply("persistent",
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
index e407a4d..d0b5900 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.commons.csv.CSVFormat;
 
@@ -62,7 +62,7 @@ public class BeamTextCSVTable extends BeamTextTable {
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
     return new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat);
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
index d32c9dfe..aea826d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
@@ -27,14 +27,14 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.commons.csv.CSVFormat;
 
 /**
  * IOWriter for {@code BeamTextCSVTable}.
  */
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>, 
PDone>
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>, 
POutput>
     implements Serializable {
   private String filePattern;
   protected Schema schema;
@@ -49,7 +49,7 @@ public class BeamTextCSVTableIOWriter extends 
PTransform<PCollection<Row>, PDone
   }
 
   @Override
-  public PDone expand(PCollection<Row> input) {
+  public POutput expand(PCollection<Row> input) {
     return input.apply("encodeRecord", ParDo.of(new DoFn<Row, String>() {
 
       @ProcessElement
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
index 841f4e2..0a5e9ee 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
-import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
 import org.apache.beam.sdk.schemas.Schema;
@@ -26,7 +25,7 @@ import org.apache.beam.sdk.schemas.Schema;
 /**
  * {@code BeamTextTable} represents a text file/directory(backed by {@code 
TextIO}).
  */
-public abstract class BeamTextTable extends BaseBeamTable implements 
Serializable {
+public abstract class BeamTextTable extends BaseBeamTable {
   protected String filePattern;
 
   protected BeamTextTable(Schema schema, String filePattern) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 6a612c4..102069a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -40,7 +40,7 @@ import org.apache.commons.csv.CSVFormat;
  * )
  * TYPE 'text'
  * COMMENT 'this is the table orders'
- * LOCATION 'text://home/admin/orders'
+ * LOCATION '/home/admin/orders'
  * TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format)
  * }</pre>
  */
@@ -53,7 +53,7 @@ public class TextTableProvider implements TableProvider {
   @Override public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = getRowTypeFromTable(table);
 
-    String filePattern = table.getLocationAsString();
+    String filePattern = table.getLocation();
     CSVFormat format = CSVFormat.DEFAULT;
     JSONObject properties = table.getProperties();
     String csvFormatStr = properties.getString("format");
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 9bf724d..93b29d9 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -44,7 +44,7 @@ public class BeamSqlCliTest {
         + "name varchar(31) COMMENT 'name', \n"
         + "age int COMMENT 'age') \n"
         + "TYPE 'text' \n"
-        + "COMMENT '' LOCATION 'text://home/admin/orders'"
+        + "COMMENT '' LOCATION '/home/admin/orders'"
     );
     Table table = metaStore.getTable("person");
     assertNotNull(table);
@@ -63,7 +63,7 @@ public class BeamSqlCliTest {
             + "name varchar(31) COMMENT 'name', \n"
             + "age int COMMENT 'age') \n"
             + "TYPE 'text' \n"
-            + "COMMENT '' LOCATION 'text://home/admin/orders'"
+            + "COMMENT '' LOCATION '/home/admin/orders'"
     );
     Table table = metaStore.getTable("person");
     assertNotNull(table);
@@ -86,7 +86,7 @@ public class BeamSqlCliTest {
             + "name varchar(31) COMMENT 'name', \n"
             + "age int COMMENT 'age') \n"
             + "TYPE 'text' \n"
-            + "COMMENT '' LOCATION 'text://home/admin/orders'"
+            + "COMMENT '' LOCATION '/home/admin/orders'"
     );
     cli.execute("drop table person");
     cli.explainQuery("select * from person");
@@ -107,7 +107,7 @@ public class BeamSqlCliTest {
             + "name varchar(31) COMMENT 'name', \n"
             + "age int COMMENT 'age') \n"
             + "TYPE 'text' \n"
-            + "COMMENT '' LOCATION 'text://home/admin/orders'"
+            + "COMMENT '' LOCATION '/home/admin/orders'"
     );
 
     String plan = cli.explainQuery("select * from person");
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 14fb484..6d5ac1a 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
-import java.net.URI;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
 import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -50,7 +49,7 @@ public class BeamSqlParserTest {
             + "name varchar(31) COMMENT 'name') \n"
             + "TYPE 'text' \n"
             + "COMMENT 'person table' \n"
-            + "LOCATION 'text://home/admin/person'\n"
+            + "LOCATION '/home/admin/person'\n"
             + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
     );
     assertEquals(
@@ -66,7 +65,7 @@ public class BeamSqlParserTest {
             + "id int COMMENT 'id', \n"
             + "name varchar(31) COMMENT 'name') \n"
             + "COMMENT 'person table' \n"
-            + "LOCATION 'text://home/admin/person'\n"
+            + "LOCATION '/home/admin/person'\n"
             + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
     );
   }
@@ -84,7 +83,7 @@ public class BeamSqlParserTest {
             + "id int COMMENT 'id', \n"
             + "name varchar(31) COMMENT 'name') \n"
             + "TYPE 'text' \n"
-            + "LOCATION 'text://home/admin/person'\n"
+            + "LOCATION '/home/admin/person'\n"
             + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
     );
     assertEquals(mockTable("person", "text", null, properties), table);
@@ -98,7 +97,7 @@ public class BeamSqlParserTest {
             + "name varchar(31) COMMENT 'name') \n"
             + "TYPE 'text' \n"
             + "COMMENT 'person table' \n"
-            + "LOCATION 'text://home/admin/person'\n"
+            + "LOCATION '/home/admin/person'\n"
     );
     assertEquals(
         mockTable("person", "text", "person table", new JSONObject()),
@@ -145,21 +144,17 @@ public class BeamSqlParserTest {
   }
 
   private static Table mockTable(String name, String type, String comment, 
JSONObject properties) {
-    return mockTable(name, type, comment, properties, "text://home/admin/" + 
name);
+    return mockTable(name, type, comment, properties, "/home/admin/" + name);
   }
 
   private static Table mockTable(String name, String type, String comment, 
JSONObject properties,
       String location) {
-    URI locationURI = null;
-    if (location != null) {
-      locationURI = URI.create(location);
-    }
 
     return Table.builder()
         .name(name)
         .type(type)
         .comment(comment)
-        .location(locationURI)
+        .location(location)
         .columns(ImmutableList.of(
             Column.builder()
                 .name("id")
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index f4253dd..47d3b06 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
@@ -123,7 +123,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
     }
 
     @Override
-    public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+    public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
       throw new UnsupportedOperationException();
     }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index ffe8dcc..58b048e 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
-import java.net.URI;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
 import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -65,7 +64,7 @@ public class KafkaTableProviderTest {
     return Table.builder()
         .name(name)
         .comment(name + " table")
-        .location(URI.create("kafka://localhost:2181/brokers?topic=test"))
+        .location("kafka://localhost:2181/brokers?topic=test")
         .columns(ImmutableList.of(
             Column.builder()
                 .name("id")
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 11ecaf4..7b47e1a 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
-import java.net.URI;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
 import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -76,7 +75,7 @@ public class TextTableProviderTest {
     return Table.builder()
         .name(name)
         .comment(name + " table")
-        .location(URI.create("text://home/admin/" + name))
+        .location("/home/admin/" + name)
         .columns(ImmutableList.of(
             Column.builder()
                 .name("id")
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index a3e391e..2ee027a 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -129,7 +128,7 @@ public class InMemoryMetaStoreTest {
     return Table.builder()
         .name(name)
         .comment(name + " table")
-        .location(URI.create("text://home/admin/" + name))
+        .location("/home/admin/" + name)
         .columns(ImmutableList.of(
             Column.builder()
                 .name("id")
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 249daee..24d22e1 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -104,7 +105,7 @@ public class MockedBoundedTable extends MockedTable {
         "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), 
Create.of(rows));
   }
 
-  @Override public PTransform<? super PCollection<Row>, PDone> buildIOWriter() 
{
+  @Override public PTransform<? super PCollection<Row>, POutput> 
buildIOWriter() {
     return new OutputStore();
   }
 
@@ -112,7 +113,7 @@ public class MockedBoundedTable extends MockedTable {
    * Keep output in {@code CONTENT} for validation.
    *
    */
-  public static class OutputStore extends PTransform<PCollection<Row>, PDone> {
+  public static class OutputStore extends PTransform<PCollection<Row>, 
POutput> {
 
     @Override
     public PDone expand(PCollection<Row> input) {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 0ed77ba..cd52de0 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -23,7 +23,7 @@ import 
org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -36,7 +36,7 @@ public abstract class MockedTable extends BaseBeamTable {
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
     throw new UnsupportedOperationException("buildIOWriter unsupported!");
   }
 }

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to