Repository: beam Updated Branches: refs/heads/master b47fd52cb -> 36a12d023
[BEAM-1786, BEAM-1871] Add the ability to register coder factories for classes allowing TableRowJsonCoder to move to sdks/java/io/google-cloud-platform Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c811f5e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c811f5e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c811f5e Branch: refs/heads/master Commit: 3c811f5eabc823c5895a7b3c62379370bff8b22c Parents: b47fd52 Author: Luke Cwik <lc...@google.com> Authored: Wed Apr 19 17:59:15 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Apr 20 14:19:03 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/core/StateTags.java | 6 +- .../beam/runners/core/ReduceFnTester.java | 12 +-- .../apache/beam/runners/core/StateTagTest.java | 3 +- .../spark/translation/SparkRuntimeContext.java | 3 +- sdks/java/core/pom.xml | 5 - .../main/java/org/apache/beam/sdk/Pipeline.java | 3 +- .../apache/beam/sdk/coders/CoderFactories.java | 22 +++- .../apache/beam/sdk/coders/CoderRegistrar.java | 45 ++++++++ .../apache/beam/sdk/coders/CoderRegistry.java | 108 ++++++++++++++----- .../apache/beam/sdk/coders/StringUtf8Coder.java | 2 +- .../beam/sdk/coders/TableRowJsonCoder.java | 88 --------------- .../apache/beam/sdk/util/state/StateSpecs.java | 6 +- .../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 - .../beam/sdk/coders/CoderRegistryTest.java | 77 +++++++------ .../beam/sdk/coders/DefaultCoderTest.java | 8 +- .../beam/sdk/coders/TableRowJsonCoderTest.java | 94 ---------------- .../beam/sdk/transforms/LatestFnTest.java | 2 +- .../org/apache/beam/sdk/transforms/SumTest.java | 2 +- .../transforms/reflect/DoFnInvokersTest.java | 4 +- sdks/java/io/google-cloud-platform/pom.xml | 5 + .../io/gcp/bigquery/BigQueryCoderRegistrar.java | 39 +++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1 - .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 1 - .../io/gcp/bigquery/StreamingWriteTables.java | 1 - .../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 1 - .../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 91 ++++++++++++++++ .../sdk/io/gcp/bigquery/TableRowWriter.java | 1 - .../bigquery/BigQueryCoderRegistrarTest.java | 40 +++++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 - .../io/gcp/bigquery/FakeBigQueryServices.java | 1 - .../sdk/io/gcp/bigquery/FakeJobService.java | 1 - .../io/gcp/bigquery/TableRowJsonCoderTest.java | 95 ++++++++++++++++ 32 files changed, 479 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index 77ae8f5..3a45569 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -47,11 +47,7 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState; @Experimental(Kind.STATE) public class StateTags { - private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); - - static { - STANDARD_REGISTRY.registerStandardCoders(); - } + private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault(); /** @deprecated for migration purposes only */ @Deprecated http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 914550e..549fd8a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -180,8 +180,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { Coder<OutputT> outputCoder) throws Exception { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); AppliedCombineFn<String, Integer, AccumT, OutputT> fn = AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder( combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); @@ -207,8 +206,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { Coder<OutputT> outputCoder) throws Exception { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); AppliedCombineFn<String, Integer, AccumT, OutputT> fn = AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder( combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); @@ -230,8 +228,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { PipelineOptions options, SideInputReader sideInputReader) throws Exception { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); AppliedCombineFn<String, Integer, AccumT, OutputT> fn = AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder( combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); @@ -254,8 +251,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { PipelineOptions options, SideInputReader sideInputReader) throws Exception { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); AppliedCombineFn<String, Integer, AccumT, OutputT> fn = AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder( combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java index 0584643..5f5d92d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -158,8 +158,7 @@ public class StateTagTest { @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testCombiningValueWithContextEquality() { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers(); Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers(); http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 9d0f576..6abab17 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -112,8 +112,7 @@ public class SparkRuntimeContext implements Serializable { public CoderRegistry getCoderRegistry() { if (coderRegistry == null) { - coderRegistry = new CoderRegistry(); - coderRegistry.registerStandardCoders(); + coderRegistry = CoderRegistry.createDefault(); } return coderRegistry; } http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 2860be2..930632d 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -141,11 +141,6 @@ <dependency> <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> <artifactId>google-api-services-storage</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index e3b9309..0a1dc13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -292,8 +292,7 @@ public class Pipeline { */ public CoderRegistry getCoderRegistry() { if (coderRegistry == null) { - coderRegistry = new CoderRegistry(); - coderRegistry.registerStandardCoders(); + coderRegistry = CoderRegistry.createDefault(); } return coderRegistry; } http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java index e1a202a..0031698 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.coders; +import com.google.common.base.MoreObjects; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -129,13 +130,13 @@ public final class CoderFactories { // Method to create a coder given component coders // For a Coder class of kind * -> * -> ... n times ... -> * // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T> - private Method factoryMethod; + private final Method factoryMethod; // Method to decompose a value of type T into its parts. // For a Coder class of kind * -> * -> ... n times ... -> * // this has type T -> List<Object> // where the list has n elements. - private Method getComponentsMethod; + private final Method getComponentsMethod; /** * Returns a CoderFactory that invokes the given static factory method @@ -248,6 +249,14 @@ public final class CoderFactories { "cannot build CoderFactory from class " + coderType + ": does not implement Coder<T> for any T."); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("factoryMethod", factoryMethod) + .add("getComponentsMethod", getComponentsMethod) + .toString(); + } } /** @@ -255,7 +264,7 @@ public final class CoderFactories { * {@link CoderFactory}. */ private static class CoderFactoryForCoder<T> implements CoderFactory { - private Coder<T> coder; + private final Coder<T> coder; public CoderFactoryForCoder(Coder<T> coder) { this.coder = coder; @@ -270,5 +279,12 @@ public final class CoderFactories { public List<Object> getInstanceComponents(Object value) { return Collections.emptyList(); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("coder", coder) + .toString(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java new file mode 100644 index 0000000..fced976 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import com.google.auto.service.AutoService; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.sdk.annotations.Experimental; + +/** + * {@link Coder} creators have the ability to automatically have their + * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry + * and a concrete implementation of this interface. + * + * <p>It is optional but recommended to use one of the many build time tools such as + * {@link AutoService} to generate the necessary META-INF files automatically. + */ +@Experimental +public interface CoderRegistrar { + /** + * Returns a mapping of {@link Class classes} to {@link CoderFactory coder factories} which + * will be registered by default within each {@link CoderRegistry coder registry} instance. + * + * <p>See {@link CoderFactories} for convenience methods to construct a {@link CoderFactory}. + * + * <p>Note that a warning is logged if multiple {@link CoderRegistrar coder registrars} provide + * mappings for the same {@link Class}. + */ + Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java index 65f4209..6b909d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java @@ -19,10 +19,14 @@ package org.apache.beam.sdk.coders; import static com.google.common.base.Preconditions.checkArgument; -import com.google.api.services.bigquery.model.TableRow; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -35,13 +39,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.Set; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; @@ -63,8 +71,7 @@ import org.slf4j.LoggerFactory; * <li>A {@link Coder} class with the static methods to satisfy * {@link CoderFactories#fromStaticMethods} can be registered via * {@link #registerCoder(Class, Class)}.</li> - * <li>Built-in types are registered via - * {@link #registerStandardCoders()}.</li> + * <li>Types can be automatically registered via {@link CoderRegistrar coder registrars}.</li> * </ul> * <li>Annotations: {@link DefaultCoder} can be used to annotate a type with * the default {@code Coder} type. The {@link Coder} class must satisfy the requirements @@ -82,33 +89,86 @@ import org.slf4j.LoggerFactory; public class CoderRegistry implements CoderProvider { private static final Logger LOG = LoggerFactory.getLogger(CoderRegistry.class); + private static final Map<Class<?>, CoderFactory> REGISTERED_CODER_FACTORIES_PER_CLASS; + + static { + // Register the standard coders first so they are choosen as the default + Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create(); + codersToRegister.put(Byte.class, CoderFactories.fromStaticMethods(ByteCoder.class)); + codersToRegister.put(ByteString.class, CoderFactories.fromStaticMethods(ByteStringCoder.class)); + codersToRegister.put(Double.class, CoderFactories.fromStaticMethods(DoubleCoder.class)); + codersToRegister.put(Instant.class, CoderFactories.fromStaticMethods(InstantCoder.class)); + codersToRegister.put(Integer.class, CoderFactories.fromStaticMethods(VarIntCoder.class)); + codersToRegister.put(Iterable.class, CoderFactories.fromStaticMethods(IterableCoder.class)); + codersToRegister.put(KV.class, CoderFactories.fromStaticMethods(KvCoder.class)); + codersToRegister.put(List.class, CoderFactories.fromStaticMethods(ListCoder.class)); + codersToRegister.put(Long.class, CoderFactories.fromStaticMethods(VarLongCoder.class)); + codersToRegister.put(Map.class, CoderFactories.fromStaticMethods(MapCoder.class)); + codersToRegister.put(Set.class, CoderFactories.fromStaticMethods(SetCoder.class)); + codersToRegister.put(String.class, CoderFactories.fromStaticMethods(StringUtf8Coder.class)); + codersToRegister.put(TimestampedValue.class, + CoderFactories.fromStaticMethods(TimestampedValue.TimestampedValueCoder.class)); + codersToRegister.put(Void.class, CoderFactories.fromStaticMethods(VoidCoder.class)); + codersToRegister.put(byte[].class, CoderFactories.fromStaticMethods(ByteArrayCoder.class)); + codersToRegister.put(IntervalWindow.class, CoderFactories.forCoder(IntervalWindow.getCoder())); + + // Enumerate all the CoderRegistrars in a deterministic order, adding all coders to register + Set<CoderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE); + registrars.addAll(Lists.newArrayList( + ServiceLoader.load(CoderRegistrar.class, ReflectHelpers.findClassLoader()))); + for (CoderRegistrar registrar : registrars) { + for (Map.Entry<Class<?>, CoderFactory> entry + : registrar.getCoderFactoriesToUseForClasses().entrySet()) { + codersToRegister.put(entry.getKey(), entry.getValue()); + } + } + + // Warn the user if multiple coders want to be registered for the same class + Map<Class<?>, Collection<CoderFactory>> multipleRegistrations = + Maps.filterValues(codersToRegister.asMap(), new Predicate<Collection<CoderFactory>>() { + @Override + public boolean apply(@Nonnull Collection<CoderFactory> input) { + return input.size() > 1; + } + }); + for (Map.Entry<Class<?>, Collection<CoderFactory>> entry : multipleRegistrations.entrySet()) { + LOG.warn("Multiple CoderFactory registrations {} found for class {}, using {}.", + entry.getKey(), entry.getValue(), entry.getValue().iterator().next()); + } + + // Build a map choosing the first coder within the multimap as the default + ImmutableMap.Builder<Class<?>, CoderFactory> registeredCoderFactoriesPerClassBuilder = + ImmutableMap.builder(); + for (Map.Entry<Class<?>, Collection<CoderFactory>> entry + : codersToRegister.asMap().entrySet()) { + registeredCoderFactoriesPerClassBuilder.put( + entry.getKey(), entry.getValue().iterator().next()); + } + REGISTERED_CODER_FACTORIES_PER_CLASS = registeredCoderFactoriesPerClassBuilder.build(); + } + + /** + * Creates a CoderRegistry containing registrations for all standard coders part of the core Java + * Apache Beam SDK and also any registrations provided by {@link CoderRegistrar coder registrars}. + * + * <p>Multiple registrations for the same class result in the (in order of precedence): + * <ul> + * <li>Standard coder part of the core Apache Beam Java SDK being used.</li> + * <li>The coder from the {@link CoderRegistrar} with the lexicographically smallest + * {@link Class#getName() class name} being used.</li> + * </ul> + */ + public static CoderRegistry createDefault() { + return new CoderRegistry(); + } public CoderRegistry() { + coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS); setFallbackCoderProvider( CoderProviders.firstOf(ProtoCoder.coderProvider(), SerializableCoder.PROVIDER)); } - /** - * Registers standard Coders with this CoderRegistry. - */ public void registerStandardCoders() { - registerCoder(Byte.class, ByteCoder.class); - registerCoder(ByteString.class, ByteStringCoder.class); - registerCoder(Double.class, DoubleCoder.class); - registerCoder(Instant.class, InstantCoder.class); - registerCoder(Integer.class, VarIntCoder.class); - registerCoder(Iterable.class, IterableCoder.class); - registerCoder(KV.class, KvCoder.class); - registerCoder(List.class, ListCoder.class); - registerCoder(Long.class, VarLongCoder.class); - registerCoder(Map.class, MapCoder.class); - registerCoder(Set.class, SetCoder.class); - registerCoder(String.class, StringUtf8Coder.class); - registerCoder(TableRow.class, TableRowJsonCoder.class); - registerCoder(TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class); - registerCoder(Void.class, VoidCoder.class); - registerCoder(byte[].class, ByteArrayCoder.class); - registerCoder(IntervalWindow.class, IntervalWindow.getCoder()); } /** @@ -642,7 +702,7 @@ public class CoderRegistry implements CoderProvider { * The map of classes to the CoderFactories to use to create their * default Coders. */ - private Map<Class<?>, CoderFactory> coderFactoryMap = new HashMap<>(); + private Map<Class<?>, CoderFactory> coderFactoryMap; /** * A provider of coders for types where no coder is registered. http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java index ca7912c..cd124ef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java @@ -127,7 +127,7 @@ public class StringUtf8Coder extends AtomicCoder<String> { * the byte size of the encoding plus the encoded length prefix. */ @Override - protected long getEncodedElementByteSize(String value, Context context) + public long getEncodedElementByteSize(String value, Context context) throws Exception { if (value == null) { throw new CoderException("cannot encode a null String"); http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java deleted file mode 100644 index 5c0929c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.api.services.bigquery.model.TableRow; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. - */ -public class TableRowJsonCoder extends AtomicCoder<TableRow> { - - @JsonCreator - public static TableRowJsonCoder of() { - return INSTANCE; - } - - @Override - public void encode(TableRow value, OutputStream outStream, Context context) - throws IOException { - String strValue = MAPPER.writeValueAsString(value); - StringUtf8Coder.of().encode(strValue, outStream, context); - } - - @Override - public TableRow decode(InputStream inStream, Context context) - throws IOException { - String strValue = StringUtf8Coder.of().decode(inStream, context); - return MAPPER.readValue(strValue, TableRow.class); - } - - @Override - protected long getEncodedElementByteSize(TableRow value, Context context) - throws Exception { - String strValue = MAPPER.writeValueAsString(value); - return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in - // TableRow. - private static final ObjectMapper MAPPER = - new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - - private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); - private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {}; - - private TableRowJsonCoder() { } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary - * {@link Object} instances, which makes the encoding non-deterministic. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "TableCell can hold arbitrary instances, which may be non-deterministic."); - } - - @Override - public TypeDescriptor<TableRow> getEncodedTypeDescriptor() { - return TYPE_DESCRIPTOR; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index 30a7a6d..dc647da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -38,11 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; @Experimental(Kind.STATE) public class StateSpecs { - private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); - - static { - STANDARD_REGISTRY.registerStandardCoders(); - } + private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault(); private StateSpecs() {} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java index eed4457..b6e9205 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java @@ -39,7 +39,6 @@ public class SdkCoreApiSurfaceTest { ImmutableSet.of( "org.apache.beam", "com.google.api.client", - "com.google.api.services.bigquery", "com.google.api.services.storage", "com.google.protobuf", "com.fasterxml.jackson.annotation", http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 10177e7..774ca9d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -21,8 +21,10 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import com.google.auto.service.AutoService; import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Duration; import java.io.IOException; import java.io.InputStream; @@ -37,6 +39,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException; import org.apache.beam.sdk.coders.protobuf.ProtoCoder; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -67,11 +70,8 @@ public class CoderRegistryTest { @Rule public ExpectedException thrown = ExpectedException.none(); - public static CoderRegistry getStandardRegistry() { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); - return registry; - } + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(CoderRegistry.class); private static class SerializableClass implements Serializable { } @@ -80,7 +80,7 @@ public class CoderRegistryTest { @Test public void testSerializableFallbackCoderProvider() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); registry.setFallbackCoderProvider(SerializableCoder.PROVIDER); Coder<?> serializableCoder = registry.getDefaultCoder(SerializableClass.class); @@ -89,7 +89,7 @@ public class CoderRegistryTest { @Test public void testProtoCoderFallbackCoderProvider() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); // MessageA is a Protocol Buffers test message with syntax 2 assertEquals(registry.getDefaultCoder(MessageA.class), ProtoCoder.of(MessageA.class)); @@ -100,7 +100,7 @@ public class CoderRegistryTest { @Test public void testAvroFallbackCoderProvider() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); registry.setFallbackCoderProvider(AvroCoder.PROVIDER); Coder<?> avroCoder = registry.getDefaultCoder(NotSerializableClass.class); @@ -109,7 +109,7 @@ public class CoderRegistryTest { @Test public void testRegisterInstantiatedCoder() throws Exception { - CoderRegistry registry = new CoderRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); registry.registerCoder(MyValue.class, MyValueCoder.of()); assertEquals(registry.getDefaultCoder(MyValue.class), MyValueCoder.of()); } @@ -137,19 +137,19 @@ public class CoderRegistryTest { public void testRegisterInstantiatedCoderInvalidRawtype() throws Exception { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("may not be used with unspecialized generic classes"); - CoderRegistry registry = new CoderRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); registry.registerCoder(List.class, new MyListCoder()); } @Test public void testSimpleDefaultCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); assertEquals(StringUtf8Coder.of(), registry.getDefaultCoder(String.class)); } @Test public void testSimpleUnknownDefaultCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); thrown.expect(CannotProvideCoderException.class); thrown.expectMessage(allOf( containsString(UnknownType.class.getCanonicalName()), @@ -161,7 +161,7 @@ public class CoderRegistryTest { @Test public void testParameterizedDefaultListCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor<List<Integer>> listToken = new TypeDescriptor<List<Integer>>() {}; assertEquals(ListCoder.of(VarIntCoder.of()), registry.getDefaultCoder(listToken)); @@ -177,7 +177,7 @@ public class CoderRegistryTest { @Test public void testParameterizedDefaultMapCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor<Map<Integer, String>> mapToken = new TypeDescriptor<Map<Integer, String>>() {}; assertEquals(MapCoder.of(VarIntCoder.of(), StringUtf8Coder.of()), registry.getDefaultCoder(mapToken)); @@ -185,7 +185,7 @@ public class CoderRegistryTest { @Test public void testParameterizedDefaultNestedMapCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor<Map<Integer, Map<String, Double>>> mapToken = new TypeDescriptor<Map<Integer, Map<String, Double>>>() {}; assertEquals( @@ -195,21 +195,21 @@ public class CoderRegistryTest { @Test public void testParameterizedDefaultSetCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor<Set<Integer>> setToken = new TypeDescriptor<Set<Integer>>() {}; assertEquals(SetCoder.of(VarIntCoder.of()), registry.getDefaultCoder(setToken)); } @Test public void testParameterizedDefaultNestedSetCoder() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor<Set<Set<Integer>>> setToken = new TypeDescriptor<Set<Set<Integer>>>() {}; assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getDefaultCoder(setToken)); } @Test public void testParameterizedDefaultCoderUnknown() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor<List<UnknownType>> listUnknownToken = new TypeDescriptor<List<UnknownType>>() {}; thrown.expect(CannotProvideCoderException.class); @@ -223,7 +223,7 @@ public class CoderRegistryTest { @Test public void testTypeParameterInferenceForward() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); MyGenericClass<MyValue, List<MyValue>> instance = new MyGenericClass<MyValue, List<MyValue>>() {}; @@ -239,7 +239,7 @@ public class CoderRegistryTest { @Test public void testTypeParameterInferenceBackward() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); MyGenericClass<MyValue, List<MyValue>> instance = new MyGenericClass<MyValue, List<MyValue>>() {}; @@ -256,7 +256,7 @@ public class CoderRegistryTest { @Test public void testGetDefaultCoderFromIntegerValue() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); Integer i = 13; Coder<Integer> coder = registry.getDefaultCoder(i); assertEquals(VarIntCoder.of(), coder); @@ -264,13 +264,13 @@ public class CoderRegistryTest { @Test public void testGetDefaultCoderFromNullValue() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); assertEquals(VoidCoder.of(), registry.getDefaultCoder((Void) null)); } @Test public void testGetDefaultCoderFromKvValue() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); KV<Integer, String> kv = KV.of(13, "hello"); Coder<KV<Integer, String>> coder = registry.getDefaultCoder(kv); assertEquals(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()), @@ -279,7 +279,7 @@ public class CoderRegistryTest { @Test public void testGetDefaultCoderFromKvNullValue() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); KV<Void, Void> kv = KV.of((Void) null, (Void) null); assertEquals(KvCoder.of(VoidCoder.of(), VoidCoder.of()), registry.getDefaultCoder(kv)); @@ -287,7 +287,7 @@ public class CoderRegistryTest { @Test public void testGetDefaultCoderFromNestedKvValue() throws Exception { - CoderRegistry registry = getStandardRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); KV<Integer, KV<Long, KV<String, String>>> kv = KV.of(13, KV.of(17L, KV.of("hello", "goodbye"))); Coder<KV<Integer, KV<Long, KV<String, String>>>> coder = registry.getDefaultCoder(kv); assertEquals( @@ -346,8 +346,7 @@ public class CoderRegistryTest { @Test public void testDefaultCoderAnnotationGenericRawtype() throws Exception { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); assertEquals( registry.getDefaultCoder(MySerializableGeneric.class), SerializableCoder.of(MySerializableGeneric.class)); @@ -355,8 +354,7 @@ public class CoderRegistryTest { @Test public void testDefaultCoderAnnotationGeneric() throws Exception { - CoderRegistry registry = new CoderRegistry(); - registry.registerStandardCoders(); + CoderRegistry registry = CoderRegistry.createDefault(); assertEquals( registry.getDefaultCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}), SerializableCoder.of(MySerializableGeneric.class)); @@ -383,7 +381,7 @@ public class CoderRegistryTest { */ @Test public void testTypeVariableErrorMessage() throws Exception { - CoderRegistry registry = new CoderRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); thrown.expect(CannotProvideCoderException.class); thrown.expectMessage(allOf( @@ -399,7 +397,7 @@ public class CoderRegistryTest { @Test @SuppressWarnings("rawtypes") public void testSerializableTypeVariableDefaultCoder() throws Exception { - CoderRegistry registry = new CoderRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TypeDescriptor type = TypeDescriptor.of( TestSerializableGenericClass.class.getTypeParameters()[0]); @@ -540,4 +538,21 @@ public class CoderRegistryTest { @SuppressWarnings("unused") private T foo; } + + @Test + public void testAutomaticRegistrationOfCoders() throws Exception { + assertEquals(CoderRegistry.createDefault().getDefaultCoder(MyValue.class), MyValueCoder.of()); + } + + /** + * A {@link CoderRegistrar} to demonstrate default {@link Coder} registration. + */ + @AutoService(CoderRegistrar.class) + public static class RegisteredTestCoderRegistrar implements CoderRegistrar { + @Override + public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() { + return ImmutableMap.<Class<?>, CoderFactory>of( + MyValue.class, CoderFactories.forCoder(MyValueCoder.of())); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java index 59749ae..d335b18 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat; import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -41,12 +40,7 @@ public class DefaultCoderTest { @Rule public ExpectedException thrown = ExpectedException.none(); - public CoderRegistry registry = new CoderRegistry(); - - @Before - public void registerStandardCoders() { - registry.registerStandardCoders(); - } + public CoderRegistry registry = CoderRegistry.createDefault(); @DefaultCoder(AvroCoder.class) private static class AvroRecord { http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java deleted file mode 100644 index 5253ce7..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TableRowJsonCoderTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.api.services.bigquery.model.TableRow; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test case for {@link TableRowJsonCoder}. - */ -@RunWith(JUnit4.class) -public class TableRowJsonCoderTest { - - private static class TableRowBuilder { - private TableRow row; - public TableRowBuilder() { - row = new TableRow(); - } - public TableRowBuilder set(String fieldName, Object value) { - row.set(fieldName, value); - return this; - } - public TableRow build() { - return row; - } - } - - private static final Coder<TableRow> TEST_CODER = TableRowJsonCoder.of(); - - private static final List<TableRow> TEST_VALUES = Arrays.asList( - new TableRowBuilder().build(), - new TableRowBuilder().set("a", "1").build(), - new TableRowBuilder().set("b", 3.14).build(), - new TableRowBuilder().set("a", "1").set("b", true).set("c", "hi").build()); - - @Test - public void testDecodeEncodeEqual() throws Exception { - for (TableRow value : TEST_VALUES) { - CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); - } - } - - // This identifier should only change if the JSON format of results from the BigQuery API changes. - private static final String EXPECTED_ENCODING_ID = ""; - - @Test - public void testEncodingId() throws Exception { - CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID); - } - - /** - * Generated data to check that the wire format has not changed. To regenerate, see - * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}. - */ - private static final List<String> TEST_ENCODINGS = Arrays.asList( - "e30", - "eyJhIjoiMSJ9", - "eyJiIjozLjE0fQ", - "eyJhIjoiMSIsImIiOnRydWUsImMiOiJoaSJ9"); - - @Test - public void testWireFormatEncode() throws Exception { - CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS); - } - - @Test - public void testEncodedTypeDescriptor() throws Exception { - assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(TableRow.class))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java index 31acb08..f49c765 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java @@ -196,7 +196,7 @@ public class LatestFnTest { public void testDefaultCoderHandlesNull() throws CannotProvideCoderException { Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); - CoderRegistry registry = new CoderRegistry(); + CoderRegistry registry = CoderRegistry.createDefault(); TimestampedValue.TimestampedValueCoder<Long> inputCoder = TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java index b2f8aa8..9d2c6f6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java @@ -37,7 +37,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class SumTest { - private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); + private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault(); @Test public void testSumGetNames() { http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 8b4df4c..5732438 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -335,7 +335,7 @@ public class DoFnInvokersTest { when(fn.newTracker(restriction)).thenReturn(tracker); fn.processElement(mockProcessContext, tracker); - assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry())); + assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault())); assertEquals(restriction, invoker.invokeGetInitialRestriction("blah")); final List<SomeRestriction> outputs = new ArrayList<>(); invoker.invokeSplitRestriction( @@ -415,7 +415,7 @@ public class DoFnInvokersTest { MockFn fn = mock(MockFn.class); DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn); - CoderRegistry coderRegistry = new CoderRegistry(); + CoderRegistry coderRegistry = CoderRegistry.createDefault(); coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class); assertThat( invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry), http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 4cd0337..3778a63 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -77,6 +77,11 @@ </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-bigquery</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java new file mode 100644 index 0000000..847c7b5 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.beam.sdk.coders.CoderFactories; +import org.apache.beam.sdk.coders.CoderFactory; +import org.apache.beam.sdk.coders.CoderRegistrar; + +/** + * A {@link CoderRegistrar} for standard types used with {@link BigQueryIO}. + */ +@AutoService(CoderRegistrar.class) +public class BigQueryCoderRegistrar implements CoderRegistrar { + @Override + public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() { + return ImmutableMap.of( + TableRow.class, CoderFactories.forCoder(TableRowJsonCoder.of()), + TableRowInfo.class, CoderFactories.forCoder(TableRowInfoCoder.of())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3f5947e..a13d61d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -41,7 +41,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 4142da9..53d395b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.NoSuchElementException; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index 4ddc1df..4d130b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.Reshuffle; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java index 5e8fa29..9ef947e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java @@ -26,7 +26,6 @@ import java.io.OutputStream; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; /** * Defines a coder for {@link TableRowInfo} objects. http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java new file mode 100644 index 0000000..ce4b669 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. + */ +public class TableRowJsonCoder extends AtomicCoder<TableRow> { + + @JsonCreator + public static TableRowJsonCoder of() { + return INSTANCE; + } + + @Override + public void encode(TableRow value, OutputStream outStream, Context context) + throws IOException { + String strValue = MAPPER.writeValueAsString(value); + StringUtf8Coder.of().encode(strValue, outStream, context); + } + + @Override + public TableRow decode(InputStream inStream, Context context) + throws IOException { + String strValue = StringUtf8Coder.of().decode(inStream, context); + return MAPPER.readValue(strValue, TableRow.class); + } + + @Override + protected long getEncodedElementByteSize(TableRow value, Context context) + throws Exception { + String strValue = MAPPER.writeValueAsString(value); + return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); + } + + ///////////////////////////////////////////////////////////////////////////// + + // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in + // TableRow. + private static final ObjectMapper MAPPER = + new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + + private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); + private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {}; + + private TableRowJsonCoder() { } + + /** + * {@inheritDoc} + * + * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary + * {@link Object} instances, which makes the encoding non-deterministic. + */ + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "TableCell can hold arbitrary instances, which may be non-deterministic."); + } + + @Override + public TypeDescriptor<TableRow> getEncodedTypeDescriptor() { + return TYPE_DESCRIPTOR; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index 91ef404..cb51158 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -26,7 +26,6 @@ import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java new file mode 100644 index 0000000..e7e9fe1 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link BigQueryCoderRegistrar}. + */ +@RunWith(JUnit4.class) +public class BigQueryCoderRegistrarTest { + @Test + public void testTableRowCoderIsRegistered() throws Exception { + CoderRegistry.createDefault().getDefaultCoder(TableRow.class); + } + + @Test + public void testTableRowInfoCoderIsRegistered() throws Exception { + CoderRegistry.createDefault().getDefaultCoder(TableRowInfo.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 62c5b5f..8e1632f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -73,7 +73,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingInput; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java index 6dfd9d7..43ad238 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java @@ -33,7 +33,6 @@ import java.util.NoSuchElementException; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.options.BigQueryOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index cffd873..bef9a26 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -61,7 +61,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; http://git-wip-us.apache.org/repos/asf/beam/blob/3c811f5e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java new file mode 100644 index 0000000..f6e02dc --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.api.services.bigquery.model.TableRow; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test case for {@link TableRowJsonCoder}. + */ +@RunWith(JUnit4.class) +public class TableRowJsonCoderTest { + + private static class TableRowBuilder { + private TableRow row; + public TableRowBuilder() { + row = new TableRow(); + } + public TableRowBuilder set(String fieldName, Object value) { + row.set(fieldName, value); + return this; + } + public TableRow build() { + return row; + } + } + + private static final Coder<TableRow> TEST_CODER = TableRowJsonCoder.of(); + + private static final List<TableRow> TEST_VALUES = Arrays.asList( + new TableRowBuilder().build(), + new TableRowBuilder().set("a", "1").build(), + new TableRowBuilder().set("b", 3.14).build(), + new TableRowBuilder().set("a", "1").set("b", true).set("c", "hi").build()); + + @Test + public void testDecodeEncodeEqual() throws Exception { + for (TableRow value : TEST_VALUES) { + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); + } + } + + // This identifier should only change if the JSON format of results from the BigQuery API changes. + private static final String EXPECTED_ENCODING_ID = ""; + + @Test + public void testEncodingId() throws Exception { + CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID); + } + + /** + * Generated data to check that the wire format has not changed. To regenerate, see + * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}. + */ + private static final List<String> TEST_ENCODINGS = Arrays.asList( + "e30", + "eyJhIjoiMSJ9", + "eyJiIjozLjE0fQ", + "eyJhIjoiMSIsImIiOnRydWUsImMiOiJoaSJ9"); + + @Test + public void testWireFormatEncode() throws Exception { + CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(TableRow.class))); + } +}