Repository: beam
Updated Branches:
  refs/heads/master b47fd52cb -> 36a12d023


[BEAM-1786, BEAM-1871] Add the ability to register coder factories for classes 
allowing TableRowJsonCoder to move to sdks/java/io/google-cloud-platform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c811f5e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c811f5e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c811f5e

Branch: refs/heads/master
Commit: 3c811f5eabc823c5895a7b3c62379370bff8b22c
Parents: b47fd52
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 19 17:59:15 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 20 14:19:03 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/StateTags.java |   6 +-
 .../beam/runners/core/ReduceFnTester.java       |  12 +--
 .../apache/beam/runners/core/StateTagTest.java  |   3 +-
 .../spark/translation/SparkRuntimeContext.java  |   3 +-
 sdks/java/core/pom.xml                          |   5 -
 .../main/java/org/apache/beam/sdk/Pipeline.java |   3 +-
 .../apache/beam/sdk/coders/CoderFactories.java  |  22 +++-
 .../apache/beam/sdk/coders/CoderRegistrar.java  |  45 ++++++++
 .../apache/beam/sdk/coders/CoderRegistry.java   | 108 ++++++++++++++-----
 .../apache/beam/sdk/coders/StringUtf8Coder.java |   2 +-
 .../beam/sdk/coders/TableRowJsonCoder.java      |  88 ---------------
 .../apache/beam/sdk/util/state/StateSpecs.java  |   6 +-
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |   1 -
 .../beam/sdk/coders/CoderRegistryTest.java      |  77 +++++++------
 .../beam/sdk/coders/DefaultCoderTest.java       |   8 +-
 .../beam/sdk/coders/TableRowJsonCoderTest.java  |  94 ----------------
 .../beam/sdk/transforms/LatestFnTest.java       |   2 +-
 .../org/apache/beam/sdk/transforms/SumTest.java |   2 +-
 .../transforms/reflect/DoFnInvokersTest.java    |   4 +-
 sdks/java/io/google-cloud-platform/pom.xml      |   5 +
 .../io/gcp/bigquery/BigQueryCoderRegistrar.java |  39 +++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   1 -
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   1 -
 .../io/gcp/bigquery/StreamingWriteTables.java   |   1 -
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |   1 -
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  |  91 ++++++++++++++++
 .../sdk/io/gcp/bigquery/TableRowWriter.java     |   1 -
 .../bigquery/BigQueryCoderRegistrarTest.java    |  40 +++++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   1 -
 .../io/gcp/bigquery/FakeBigQueryServices.java   |   1 -
 .../sdk/io/gcp/bigquery/FakeJobService.java     |   1 -
 .../io/gcp/bigquery/TableRowJsonCoderTest.java  |  95 ++++++++++++++++
 32 files changed, 479 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 77ae8f5..3a45569 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -47,11 +47,7 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
 @Experimental(Kind.STATE)
 public class StateTags {
 
-  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
-
-  static {
-    STANDARD_REGISTRY.registerStandardCoders();
-  }
+  private static final CoderRegistry STANDARD_REGISTRY = 
CoderRegistry.createDefault();
 
   /** @deprecated for migration purposes only */
   @Deprecated

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 914550e..549fd8a 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -180,8 +180,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
           Coder<OutputT> outputCoder)
           throws Exception {
 
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
     AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
         AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
             combineFn, registry, KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()));
@@ -207,8 +206,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
       Coder<OutputT> outputCoder)
       throws Exception {
 
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
     AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
         AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
             combineFn, registry, KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()));
@@ -230,8 +228,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
           PipelineOptions options,
           SideInputReader sideInputReader)
           throws Exception {
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
     AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
         AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
             combineFn, registry, KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()));
@@ -254,8 +251,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
       PipelineOptions options,
       SideInputReader sideInputReader)
       throws Exception {
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
     AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
         AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
             combineFn, registry, KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()));

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 0584643..5f5d92d 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -158,8 +158,7 @@ public class StateTagTest {
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Test
   public void testCombiningValueWithContextEquality() {
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
 
     Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
     Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 9d0f576..6abab17 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -112,8 +112,7 @@ public class SparkRuntimeContext implements Serializable {
 
   public CoderRegistry getCoderRegistry() {
     if (coderRegistry == null) {
-      coderRegistry = new CoderRegistry();
-      coderRegistry.registerStandardCoders();
+      coderRegistry = CoderRegistry.createDefault();
     }
     return coderRegistry;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 2860be2..930632d 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -141,11 +141,6 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-bigquery</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-storage</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index e3b9309..0a1dc13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -292,8 +292,7 @@ public class Pipeline {
    */
   public CoderRegistry getCoderRegistry() {
     if (coderRegistry == null) {
-      coderRegistry = new CoderRegistry();
-      coderRegistry.registerStandardCoders();
+      coderRegistry = CoderRegistry.createDefault();
     }
     return coderRegistry;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
index e1a202a..0031698 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.common.base.MoreObjects;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
@@ -129,13 +130,13 @@ public final class CoderFactories {
     // Method to create a coder given component coders
     // For a Coder class of kind * -> * -> ... n times ... -> *
     // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
-    private Method factoryMethod;
+    private final Method factoryMethod;
 
     // Method to decompose a value of type T into its parts.
     // For a Coder class of kind * -> * -> ... n times ... -> *
     // this has type T -> List<Object>
     // where the list has n elements.
-    private Method getComponentsMethod;
+    private final Method getComponentsMethod;
 
     /**
      * Returns a CoderFactory that invokes the given static factory method
@@ -248,6 +249,14 @@ public final class CoderFactories {
           "cannot build CoderFactory from class " + coderType
           + ": does not implement Coder<T> for any T.");
     }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("factoryMethod", factoryMethod)
+          .add("getComponentsMethod", getComponentsMethod)
+          .toString();
+    }
   }
 
   /**
@@ -255,7 +264,7 @@ public final class CoderFactories {
    * {@link CoderFactory}.
    */
   private static class CoderFactoryForCoder<T> implements CoderFactory {
-    private Coder<T> coder;
+    private final Coder<T> coder;
 
     public CoderFactoryForCoder(Coder<T> coder) {
       this.coder = coder;
@@ -270,5 +279,12 @@ public final class CoderFactories {
     public List<Object> getInstanceComponents(Object value) {
       return Collections.emptyList();
     }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("coder", coder)
+          .toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
new file mode 100644
index 0000000..fced976
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * {@link Coder} creators have the ability to automatically have their
+ * {@link Coder coders} registered with this SDK by creating a {@link 
ServiceLoader} entry
+ * and a concrete implementation of this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools 
such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+@Experimental
+public interface CoderRegistrar {
+  /**
+   * Returns a mapping of {@link Class classes} to {@link CoderFactory coder 
factories} which
+   * will be registered by default within each {@link CoderRegistry coder 
registry} instance.
+   *
+   * <p>See {@link CoderFactories} for convenience methods to construct a 
{@link CoderFactory}.
+   *
+   * <p>Note that a warning is logged if multiple {@link CoderRegistrar coder 
registrars} provide
+   * mappings for the same {@link Class}.
+   */
+  Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 65f4209..6b909d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -19,10 +19,14 @@ package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -35,13 +39,17 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.Set;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -63,8 +71,7 @@ import org.slf4j.LoggerFactory;
  *       <li>A {@link Coder} class with the static methods to satisfy
  *           {@link CoderFactories#fromStaticMethods} can be registered via
  *           {@link #registerCoder(Class, Class)}.</li>
- *       <li>Built-in types are registered via
- *           {@link #registerStandardCoders()}.</li>
+ *       <li>Types can be automatically registered via {@link CoderRegistrar 
coder registrars}.</li>
  *     </ul>
  *   <li>Annotations: {@link DefaultCoder} can be used to annotate a type with
  *       the default {@code Coder} type. The {@link Coder} class must satisfy 
the requirements
@@ -82,33 +89,86 @@ import org.slf4j.LoggerFactory;
 public class CoderRegistry implements CoderProvider {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CoderRegistry.class);
+  private static final Map<Class<?>, CoderFactory> 
REGISTERED_CODER_FACTORIES_PER_CLASS;
+
+  static {
+    // Register the standard coders first so they are choosen as the default
+    Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create();
+    codersToRegister.put(Byte.class, 
CoderFactories.fromStaticMethods(ByteCoder.class));
+    codersToRegister.put(ByteString.class, 
CoderFactories.fromStaticMethods(ByteStringCoder.class));
+    codersToRegister.put(Double.class, 
CoderFactories.fromStaticMethods(DoubleCoder.class));
+    codersToRegister.put(Instant.class, 
CoderFactories.fromStaticMethods(InstantCoder.class));
+    codersToRegister.put(Integer.class, 
CoderFactories.fromStaticMethods(VarIntCoder.class));
+    codersToRegister.put(Iterable.class, 
CoderFactories.fromStaticMethods(IterableCoder.class));
+    codersToRegister.put(KV.class, 
CoderFactories.fromStaticMethods(KvCoder.class));
+    codersToRegister.put(List.class, 
CoderFactories.fromStaticMethods(ListCoder.class));
+    codersToRegister.put(Long.class, 
CoderFactories.fromStaticMethods(VarLongCoder.class));
+    codersToRegister.put(Map.class, 
CoderFactories.fromStaticMethods(MapCoder.class));
+    codersToRegister.put(Set.class, 
CoderFactories.fromStaticMethods(SetCoder.class));
+    codersToRegister.put(String.class, 
CoderFactories.fromStaticMethods(StringUtf8Coder.class));
+    codersToRegister.put(TimestampedValue.class,
+        
CoderFactories.fromStaticMethods(TimestampedValue.TimestampedValueCoder.class));
+    codersToRegister.put(Void.class, 
CoderFactories.fromStaticMethods(VoidCoder.class));
+    codersToRegister.put(byte[].class, 
CoderFactories.fromStaticMethods(ByteArrayCoder.class));
+    codersToRegister.put(IntervalWindow.class, 
CoderFactories.forCoder(IntervalWindow.getCoder()));
+
+    // Enumerate all the CoderRegistrars in a deterministic order, adding all 
coders to register
+    Set<CoderRegistrar> registrars = 
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+    registrars.addAll(Lists.newArrayList(
+        ServiceLoader.load(CoderRegistrar.class, 
ReflectHelpers.findClassLoader())));
+    for (CoderRegistrar registrar : registrars) {
+      for (Map.Entry<Class<?>, CoderFactory> entry
+          : registrar.getCoderFactoriesToUseForClasses().entrySet()) {
+        codersToRegister.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // Warn the user if multiple coders want to be registered for the same 
class
+    Map<Class<?>, Collection<CoderFactory>> multipleRegistrations =
+        Maps.filterValues(codersToRegister.asMap(), new 
Predicate<Collection<CoderFactory>>() {
+      @Override
+      public boolean apply(@Nonnull Collection<CoderFactory> input) {
+        return input.size() > 1;
+      }
+    });
+    for (Map.Entry<Class<?>, Collection<CoderFactory>> entry : 
multipleRegistrations.entrySet()) {
+      LOG.warn("Multiple CoderFactory registrations {} found for class {}, 
using {}.",
+          entry.getKey(), entry.getValue(), 
entry.getValue().iterator().next());
+    }
+
+    // Build a map choosing the first coder within the multimap as the default
+    ImmutableMap.Builder<Class<?>, CoderFactory> 
registeredCoderFactoriesPerClassBuilder =
+        ImmutableMap.builder();
+    for (Map.Entry<Class<?>, Collection<CoderFactory>> entry
+        : codersToRegister.asMap().entrySet()) {
+      registeredCoderFactoriesPerClassBuilder.put(
+          entry.getKey(), entry.getValue().iterator().next());
+    }
+    REGISTERED_CODER_FACTORIES_PER_CLASS = 
registeredCoderFactoriesPerClassBuilder.build();
+  }
+
+  /**
+   * Creates a CoderRegistry containing registrations for all standard coders 
part of the core Java
+   * Apache Beam SDK and also any registrations provided by {@link 
CoderRegistrar coder registrars}.
+   *
+   * <p>Multiple registrations for the same class result in the (in order of 
precedence):
+   * <ul>
+   *   <li>Standard coder part of the core Apache Beam Java SDK being 
used.</li>
+   *   <li>The coder from the {@link CoderRegistrar} with the 
lexicographically smallest
+   *   {@link Class#getName() class name} being used.</li>
+   * </ul>
+   */
+  public static CoderRegistry createDefault() {
+    return new CoderRegistry();
+  }
 
   public CoderRegistry() {
+    coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
     setFallbackCoderProvider(
         CoderProviders.firstOf(ProtoCoder.coderProvider(), 
SerializableCoder.PROVIDER));
   }
 
-  /**
-   * Registers standard Coders with this CoderRegistry.
-   */
   public void registerStandardCoders() {
-    registerCoder(Byte.class, ByteCoder.class);
-    registerCoder(ByteString.class, ByteStringCoder.class);
-    registerCoder(Double.class, DoubleCoder.class);
-    registerCoder(Instant.class, InstantCoder.class);
-    registerCoder(Integer.class, VarIntCoder.class);
-    registerCoder(Iterable.class, IterableCoder.class);
-    registerCoder(KV.class, KvCoder.class);
-    registerCoder(List.class, ListCoder.class);
-    registerCoder(Long.class, VarLongCoder.class);
-    registerCoder(Map.class, MapCoder.class);
-    registerCoder(Set.class, SetCoder.class);
-    registerCoder(String.class, StringUtf8Coder.class);
-    registerCoder(TableRow.class, TableRowJsonCoder.class);
-    registerCoder(TimestampedValue.class, 
TimestampedValue.TimestampedValueCoder.class);
-    registerCoder(Void.class, VoidCoder.class);
-    registerCoder(byte[].class, ByteArrayCoder.class);
-    registerCoder(IntervalWindow.class, IntervalWindow.getCoder());
   }
 
   /**
@@ -642,7 +702,7 @@ public class CoderRegistry implements CoderProvider {
    * The map of classes to the CoderFactories to use to create their
    * default Coders.
    */
-  private Map<Class<?>, CoderFactory> coderFactoryMap = new HashMap<>();
+  private Map<Class<?>, CoderFactory> coderFactoryMap;
 
   /**
    * A provider of coders for types where no coder is registered.

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index ca7912c..cd124ef 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -127,7 +127,7 @@ public class StringUtf8Coder extends AtomicCoder<String> {
    * the byte size of the encoding plus the encoded length prefix.
    */
   @Override
-  protected long getEncodedElementByteSize(String value, Context context)
+  public long getEncodedElementByteSize(String value, Context context)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null String");

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
deleted file mode 100644
index 5c0929c..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.api.services.bigquery.model.TableRow;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their 
native JSON format.
- */
-public class TableRowJsonCoder extends AtomicCoder<TableRow> {
-
-  @JsonCreator
-  public static TableRowJsonCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(TableRow value, OutputStream outStream, Context context)
-      throws IOException {
-    String strValue = MAPPER.writeValueAsString(value);
-    StringUtf8Coder.of().encode(strValue, outStream, context);
-  }
-
-  @Override
-  public TableRow decode(InputStream inStream, Context context)
-      throws IOException {
-    String strValue = StringUtf8Coder.of().decode(inStream, context);
-    return MAPPER.readValue(strValue, TableRow.class);
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(TableRow value, Context context)
-      throws Exception {
-    String strValue = MAPPER.writeValueAsString(value);
-    return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
-  // TableRow.
-  private static final ObjectMapper MAPPER =
-      new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-
-  private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
-  private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new 
TypeDescriptor<TableRow>() {};
-
-  private TableRowJsonCoder() { }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. A {@link TableRow} can hold 
arbitrary
-   *         {@link Object} instances, which makes the encoding 
non-deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "TableCell can hold arbitrary instances, which may be 
non-deterministic.");
-  }
-
-  @Override
-  public TypeDescriptor<TableRow> getEncodedTypeDescriptor() {
-    return TYPE_DESCRIPTOR;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 30a7a6d..dc647da 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -38,11 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 @Experimental(Kind.STATE)
 public class StateSpecs {
 
-  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
-
-  static {
-    STANDARD_REGISTRY.registerStandardCoders();
-  }
+  private static final CoderRegistry STANDARD_REGISTRY = 
CoderRegistry.createDefault();
 
   private StateSpecs() {}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index eed4457..b6e9205 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -39,7 +39,6 @@ public class SdkCoreApiSurfaceTest {
         ImmutableSet.of(
             "org.apache.beam",
             "com.google.api.client",
-            "com.google.api.services.bigquery",
             "com.google.api.services.storage",
             "com.google.protobuf",
             "com.fasterxml.jackson.annotation",

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 10177e7..774ca9d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -21,8 +21,10 @@ import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 
+import com.google.auto.service.AutoService;
 import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Duration;
 import java.io.IOException;
 import java.io.InputStream;
@@ -37,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -67,11 +70,8 @@ public class CoderRegistryTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  public static CoderRegistry getStandardRegistry() {
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
-    return registry;
-  }
+  @Rule
+  public ExpectedLogs expectedLogs = ExpectedLogs.none(CoderRegistry.class);
 
   private static class SerializableClass implements Serializable {
   }
@@ -80,7 +80,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testSerializableFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
     Coder<?> serializableCoder = 
registry.getDefaultCoder(SerializableClass.class);
 
@@ -89,7 +89,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testProtoCoderFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
 
     // MessageA is a Protocol Buffers test message with syntax 2
     assertEquals(registry.getDefaultCoder(MessageA.class), 
ProtoCoder.of(MessageA.class));
@@ -100,7 +100,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testAvroFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
     Coder<?> avroCoder = registry.getDefaultCoder(NotSerializableClass.class);
 
@@ -109,7 +109,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testRegisterInstantiatedCoder() throws Exception {
-    CoderRegistry registry = new CoderRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     registry.registerCoder(MyValue.class, MyValueCoder.of());
     assertEquals(registry.getDefaultCoder(MyValue.class), MyValueCoder.of());
   }
@@ -137,19 +137,19 @@ public class CoderRegistryTest {
   public void testRegisterInstantiatedCoderInvalidRawtype() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("may not be used with unspecialized generic classes");
-    CoderRegistry registry = new CoderRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     registry.registerCoder(List.class, new MyListCoder());
   }
 
   @Test
   public void testSimpleDefaultCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(StringUtf8Coder.of(), registry.getDefaultCoder(String.class));
   }
 
   @Test
   public void testSimpleUnknownDefaultCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage(allOf(
         containsString(UnknownType.class.getCanonicalName()),
@@ -161,7 +161,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testParameterizedDefaultListCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<List<Integer>> listToken = new 
TypeDescriptor<List<Integer>>() {};
     assertEquals(ListCoder.of(VarIntCoder.of()),
                  registry.getDefaultCoder(listToken));
@@ -177,7 +177,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testParameterizedDefaultMapCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Map<Integer, String>> mapToken = new 
TypeDescriptor<Map<Integer, String>>() {};
     assertEquals(MapCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
                  registry.getDefaultCoder(mapToken));
@@ -185,7 +185,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testParameterizedDefaultNestedMapCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Map<Integer, Map<String, Double>>> mapToken =
         new TypeDescriptor<Map<Integer, Map<String, Double>>>() {};
     assertEquals(
@@ -195,21 +195,21 @@ public class CoderRegistryTest {
 
   @Test
   public void testParameterizedDefaultSetCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Set<Integer>> setToken = new TypeDescriptor<Set<Integer>>() 
{};
     assertEquals(SetCoder.of(VarIntCoder.of()), 
registry.getDefaultCoder(setToken));
   }
 
   @Test
   public void testParameterizedDefaultNestedSetCoder() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Set<Set<Integer>>> setToken = new 
TypeDescriptor<Set<Set<Integer>>>() {};
     assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), 
registry.getDefaultCoder(setToken));
   }
 
   @Test
   public void testParameterizedDefaultCoderUnknown() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<List<UnknownType>> listUnknownToken = new 
TypeDescriptor<List<UnknownType>>() {};
 
     thrown.expect(CannotProvideCoderException.class);
@@ -223,7 +223,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testTypeParameterInferenceForward() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     MyGenericClass<MyValue, List<MyValue>> instance =
         new MyGenericClass<MyValue, List<MyValue>>() {};
 
@@ -239,7 +239,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testTypeParameterInferenceBackward() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     MyGenericClass<MyValue, List<MyValue>> instance =
         new MyGenericClass<MyValue, List<MyValue>>() {};
 
@@ -256,7 +256,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testGetDefaultCoderFromIntegerValue() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     Integer i = 13;
     Coder<Integer> coder = registry.getDefaultCoder(i);
     assertEquals(VarIntCoder.of(), coder);
@@ -264,13 +264,13 @@ public class CoderRegistryTest {
 
   @Test
   public void testGetDefaultCoderFromNullValue() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(VoidCoder.of(), registry.getDefaultCoder((Void) null));
   }
 
   @Test
   public void testGetDefaultCoderFromKvValue() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     KV<Integer, String> kv = KV.of(13, "hello");
     Coder<KV<Integer, String>> coder = registry.getDefaultCoder(kv);
     assertEquals(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
@@ -279,7 +279,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testGetDefaultCoderFromKvNullValue() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     KV<Void, Void> kv = KV.of((Void) null, (Void) null);
     assertEquals(KvCoder.of(VoidCoder.of(), VoidCoder.of()),
         registry.getDefaultCoder(kv));
@@ -287,7 +287,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testGetDefaultCoderFromNestedKvValue() throws Exception {
-    CoderRegistry registry = getStandardRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     KV<Integer, KV<Long, KV<String, String>>> kv = KV.of(13, KV.of(17L, 
KV.of("hello", "goodbye")));
     Coder<KV<Integer, KV<Long, KV<String, String>>>> coder = 
registry.getDefaultCoder(kv);
     assertEquals(
@@ -346,8 +346,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testDefaultCoderAnnotationGenericRawtype() throws Exception {
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(
         registry.getDefaultCoder(MySerializableGeneric.class),
         SerializableCoder.of(MySerializableGeneric.class));
@@ -355,8 +354,7 @@ public class CoderRegistryTest {
 
   @Test
   public void testDefaultCoderAnnotationGeneric() throws Exception {
-    CoderRegistry registry = new CoderRegistry();
-    registry.registerStandardCoders();
+    CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(
         registry.getDefaultCoder(new 
TypeDescriptor<MySerializableGeneric<String>>() {}),
         SerializableCoder.of(MySerializableGeneric.class));
@@ -383,7 +381,7 @@ public class CoderRegistryTest {
    */
   @Test
   public void testTypeVariableErrorMessage() throws Exception {
-    CoderRegistry registry = new CoderRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
 
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage(allOf(
@@ -399,7 +397,7 @@ public class CoderRegistryTest {
   @Test
   @SuppressWarnings("rawtypes")
   public void testSerializableTypeVariableDefaultCoder() throws Exception {
-    CoderRegistry registry = new CoderRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
 
     TypeDescriptor type = TypeDescriptor.of(
         TestSerializableGenericClass.class.getTypeParameters()[0]);
@@ -540,4 +538,21 @@ public class CoderRegistryTest {
     @SuppressWarnings("unused")
     private T foo;
   }
+
+  @Test
+  public void testAutomaticRegistrationOfCoders() throws Exception {
+    assertEquals(CoderRegistry.createDefault().getDefaultCoder(MyValue.class), 
MyValueCoder.of());
+  }
+
+  /**
+   * A {@link CoderRegistrar} to demonstrate default {@link Coder} 
registration.
+   */
+  @AutoService(CoderRegistrar.class)
+  public static class RegisteredTestCoderRegistrar implements CoderRegistrar {
+    @Override
+    public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
+      return ImmutableMap.<Class<?>, CoderFactory>of(
+          MyValue.class, CoderFactories.forCoder(MyValueCoder.of()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
index 59749ae..d335b18 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat;
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -41,12 +40,7 @@ public class DefaultCoderTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  public CoderRegistry registry = new CoderRegistry();
-
-  @Before
-  public void registerStandardCoders() {
-    registry.registerStandardCoders();
-  }
+  public CoderRegistry registry = CoderRegistry.createDefault();
 
   @DefaultCoder(AvroCoder.class)
   private static class AvroRecord {

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java
deleted file mode 100644
index 5253ce7..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.api.services.bigquery.model.TableRow;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test case for {@link TableRowJsonCoder}.
- */
-@RunWith(JUnit4.class)
-public class TableRowJsonCoderTest {
-
-  private static class TableRowBuilder {
-    private TableRow row;
-    public TableRowBuilder() {
-      row = new TableRow();
-    }
-    public TableRowBuilder set(String fieldName, Object value) {
-      row.set(fieldName, value);
-      return this;
-    }
-    public TableRow build() {
-      return row;
-    }
-  }
-
-  private static final Coder<TableRow> TEST_CODER = TableRowJsonCoder.of();
-
-  private static final List<TableRow> TEST_VALUES = Arrays.asList(
-      new TableRowBuilder().build(),
-      new TableRowBuilder().set("a", "1").build(),
-      new TableRowBuilder().set("b", 3.14).build(),
-      new TableRowBuilder().set("a", "1").set("b", true).set("c", 
"hi").build());
-
-  @Test
-  public void testDecodeEncodeEqual() throws Exception {
-    for (TableRow value : TEST_VALUES) {
-      CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
-    }
-  }
-
-  // This identifier should only change if the JSON format of results from the 
BigQuery API changes.
-  private static final String EXPECTED_ENCODING_ID = "";
-
-  @Test
-  public void testEncodingId() throws Exception {
-    CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
-  }
-
-  /**
-   * Generated data to check that the wire format has not changed. To 
regenerate, see
-   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
-   */
-  private static final List<String> TEST_ENCODINGS = Arrays.asList(
-      "e30",
-      "eyJhIjoiMSJ9",
-      "eyJiIjozLjE0fQ",
-      "eyJhIjoiMSIsImIiOnRydWUsImMiOiJoaSJ9");
-
-  @Test
-  public void testWireFormatEncode() throws Exception {
-    CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, 
TEST_ENCODINGS);
-  }
-
-  @Test
-  public void testEncodedTypeDescriptor() throws Exception {
-    assertThat(TEST_CODER.getEncodedTypeDescriptor(), 
equalTo(TypeDescriptor.of(TableRow.class)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
index 31acb08..f49c765 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
@@ -196,7 +196,7 @@ public class LatestFnTest {
   public void testDefaultCoderHandlesNull() throws CannotProvideCoderException 
{
     Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
 
-    CoderRegistry registry = new CoderRegistry();
+    CoderRegistry registry = CoderRegistry.createDefault();
     TimestampedValue.TimestampedValueCoder<Long> inputCoder =
         TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index b2f8aa8..9d2c6f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -37,7 +37,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class SumTest {
-  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
+  private static final CoderRegistry STANDARD_REGISTRY = 
CoderRegistry.createDefault();
 
   @Test
   public void testSumGetNames() {

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 8b4df4c..5732438 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -335,7 +335,7 @@ public class DoFnInvokersTest {
     when(fn.newTracker(restriction)).thenReturn(tracker);
     fn.processElement(mockProcessContext, tracker);
 
-    assertEquals(coder, invoker.invokeGetRestrictionCoder(new 
CoderRegistry()));
+    assertEquals(coder, 
invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
     assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
     final List<SomeRestriction> outputs = new ArrayList<>();
     invoker.invokeSplitRestriction(
@@ -415,7 +415,7 @@ public class DoFnInvokersTest {
     MockFn fn = mock(MockFn.class);
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
 
-    CoderRegistry coderRegistry = new CoderRegistry();
+    CoderRegistry coderRegistry = CoderRegistry.createDefault();
     coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, 
CoderForDefaultTracker.class);
     assertThat(
         
invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml 
b/sdks/java/io/google-cloud-platform/pom.xml
index 4cd0337..3778a63 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -77,6 +77,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-bigquery</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
new file mode 100644
index 0000000..847c7b5
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.CoderFactories;
+import org.apache.beam.sdk.coders.CoderFactory;
+import org.apache.beam.sdk.coders.CoderRegistrar;
+
+/**
+ * A {@link CoderRegistrar} for standard types used with {@link BigQueryIO}.
+ */
+@AutoService(CoderRegistrar.class)
+public class BigQueryCoderRegistrar implements CoderRegistrar {
+  @Override
+  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
+    return ImmutableMap.of(
+        TableRow.class, CoderFactories.forCoder(TableRowJsonCoder.of()),
+        TableRowInfo.class, CoderFactories.forCoder(TableRowInfoCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3f5947e..a13d61d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -41,7 +41,6 @@ import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 4142da9..53d395b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 4ddc1df..4d130b6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import com.google.api.services.bigquery.model.TableRow;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.Reshuffle;

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 5e8fa29..9ef947e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -26,7 +26,6 @@ import java.io.OutputStream;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 
 /**
  * Defines a coder for {@link TableRowInfo} objects.

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
new file mode 100644
index 0000000..ce4b669
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their 
native JSON format.
+ */
+public class TableRowJsonCoder extends AtomicCoder<TableRow> {
+
+  @JsonCreator
+  public static TableRowJsonCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(TableRow value, OutputStream outStream, Context context)
+      throws IOException {
+    String strValue = MAPPER.writeValueAsString(value);
+    StringUtf8Coder.of().encode(strValue, outStream, context);
+  }
+
+  @Override
+  public TableRow decode(InputStream inStream, Context context)
+      throws IOException {
+    String strValue = StringUtf8Coder.of().decode(inStream, context);
+    return MAPPER.readValue(strValue, TableRow.class);
+  }
+
+  @Override
+  protected long getEncodedElementByteSize(TableRow value, Context context)
+      throws Exception {
+    String strValue = MAPPER.writeValueAsString(value);
+    return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
+  // TableRow.
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+
+  private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
+  private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new 
TypeDescriptor<TableRow>() {};
+
+  private TableRowJsonCoder() { }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws NonDeterministicException always. A {@link TableRow} can hold 
arbitrary
+   *         {@link Object} instances, which makes the encoding 
non-deterministic.
+   */
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(this,
+        "TableCell can hold arbitrary instances, which may be 
non-deterministic.");
+  }
+
+  @Override
+  public TypeDescriptor<TableRow> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index 91ef404..cb51158 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -26,7 +26,6 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
new file mode 100644
index 0000000..e7e9fe1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link BigQueryCoderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryCoderRegistrarTest {
+  @Test
+  public void testTableRowCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getDefaultCoder(TableRow.class);
+  }
+
+  @Test
+  public void testTableRowInfoCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getDefaultCoder(TableRowInfo.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 62c5b5f..8e1632f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingInput;

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 6dfd9d7..43ad238 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -33,7 +33,6 @@ import java.util.NoSuchElementException;
 
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.options.BigQueryOptions;
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index cffd873..bef9a26 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -61,7 +61,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;

http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
new file mode 100644
index 0000000..f6e02dc
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test case for {@link TableRowJsonCoder}.
+ */
+@RunWith(JUnit4.class)
+public class TableRowJsonCoderTest {
+
+  private static class TableRowBuilder {
+    private TableRow row;
+    public TableRowBuilder() {
+      row = new TableRow();
+    }
+    public TableRowBuilder set(String fieldName, Object value) {
+      row.set(fieldName, value);
+      return this;
+    }
+    public TableRow build() {
+      return row;
+    }
+  }
+
+  private static final Coder<TableRow> TEST_CODER = TableRowJsonCoder.of();
+
+  private static final List<TableRow> TEST_VALUES = Arrays.asList(
+      new TableRowBuilder().build(),
+      new TableRowBuilder().set("a", "1").build(),
+      new TableRowBuilder().set("b", 3.14).build(),
+      new TableRowBuilder().set("a", "1").set("b", true).set("c", 
"hi").build());
+
+  @Test
+  public void testDecodeEncodeEqual() throws Exception {
+    for (TableRow value : TEST_VALUES) {
+      CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+    }
+  }
+
+  // This identifier should only change if the JSON format of results from the 
BigQuery API changes.
+  private static final String EXPECTED_ENCODING_ID = "";
+
+  @Test
+  public void testEncodingId() throws Exception {
+    CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
+  }
+
+  /**
+   * Generated data to check that the wire format has not changed. To 
regenerate, see
+   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   */
+  private static final List<String> TEST_ENCODINGS = Arrays.asList(
+      "e30",
+      "eyJhIjoiMSJ9",
+      "eyJiIjozLjE0fQ",
+      "eyJhIjoiMSIsImIiOnRydWUsImMiOiJoaSJ9");
+
+  @Test
+  public void testWireFormatEncode() throws Exception {
+    CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, 
TEST_ENCODINGS);
+  }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), 
equalTo(TypeDescriptor.of(TableRow.class)));
+  }
+}

Reply via email to